websocket_server.hpp
Go to the documentation of this file.
1 #pragma once
2 
3 #include <algorithm>
4 #include <chrono>
5 #include <cstdint>
6 #include <functional>
7 #include <map>
8 #include <memory>
9 #include <optional>
10 #include <shared_mutex>
11 #include <string_view>
12 #include <thread>
13 #include <unordered_map>
14 #include <unordered_set>
15 #include <vector>
16 
17 #include <nlohmann/json.hpp>
18 #include <websocketpp/config/asio.hpp>
19 #include <websocketpp/server.hpp>
20 
21 #include "callback_queue.hpp"
22 #include "common.hpp"
23 #include "parameter.hpp"
24 #include "regex_utils.hpp"
25 #include "serialization.hpp"
26 #include "server_interface.hpp"
27 #include "websocket_logging.hpp"
28 
29 // Debounce a function call (tied to the line number)
30 // This macro takes in a function and the debounce time in milliseconds
31 #define FOXGLOVE_DEBOUNCE(f, ms) \
32  { \
33  static auto last_call = std::chrono::system_clock::now(); \
34  const auto now = std::chrono::system_clock::now(); \
35  if (std::chrono::duration_cast<std::chrono::milliseconds>(now - last_call).count() > ms) { \
36  last_call = now; \
37  f(); \
38  } \
39  }
40 
41 namespace {
42 
43 constexpr uint32_t StringHash(const std::string_view str) {
44  uint32_t result = 0x811C9DC5; // FNV-1a 32-bit algorithm
45  for (char c : str) {
46  result = (static_cast<uint32_t>(c) ^ result) * 0x01000193;
47  }
48  return result;
49 }
50 
51 constexpr auto SUBSCRIBE = StringHash("subscribe");
52 constexpr auto UNSUBSCRIBE = StringHash("unsubscribe");
53 constexpr auto ADVERTISE = StringHash("advertise");
54 constexpr auto UNADVERTISE = StringHash("unadvertise");
55 constexpr auto GET_PARAMETERS = StringHash("getParameters");
56 constexpr auto SET_PARAMETERS = StringHash("setParameters");
57 constexpr auto SUBSCRIBE_PARAMETER_UPDATES = StringHash("subscribeParameterUpdates");
58 constexpr auto UNSUBSCRIBE_PARAMETER_UPDATES = StringHash("unsubscribeParameterUpdates");
59 constexpr auto SUBSCRIBE_CONNECTION_GRAPH = StringHash("subscribeConnectionGraph");
60 constexpr auto UNSUBSCRIBE_CONNECTION_GRAPH = StringHash("unsubscribeConnectionGraph");
61 constexpr auto FETCH_ASSET = StringHash("fetchAsset");
62 } // namespace
63 
64 namespace foxglove {
65 
67 
68 using ConnHandle = websocketpp::connection_hdl;
69 using OpCode = websocketpp::frame::opcode::value;
70 
71 static const websocketpp::log::level APP = websocketpp::log::alevel::app;
72 static const websocketpp::log::level WARNING = websocketpp::log::elevel::warn;
73 static const websocketpp::log::level RECOVERABLE = websocketpp::log::elevel::rerror;
74 
76 const std::unordered_map<std::string, std::string> CAPABILITY_BY_CLIENT_OPERATION = {
77  // {"subscribe", }, // No required capability.
78  // {"unsubscribe", }, // No required capability.
79  {"advertise", CAPABILITY_CLIENT_PUBLISH},
80  {"unadvertise", CAPABILITY_CLIENT_PUBLISH},
81  {"getParameters", CAPABILITY_PARAMETERS},
82  {"setParameters", CAPABILITY_PARAMETERS},
83  {"subscribeParameterUpdates", CAPABILITY_PARAMETERS_SUBSCRIBE},
84  {"unsubscribeParameterUpdates", CAPABILITY_PARAMETERS_SUBSCRIBE},
85  {"subscribeConnectionGraph", CAPABILITY_CONNECTION_GRAPH},
86  {"unsubscribeConnectionGraph", CAPABILITY_CONNECTION_GRAPH},
87  {"fetchAsset", CAPABILITY_ASSETS},
88 };
89 
91 const std::unordered_map<ClientBinaryOpcode, std::string> CAPABILITY_BY_CLIENT_BINARY_OPERATION = {
94 };
95 
96 enum class StatusLevel : uint8_t {
97  Info = 0,
98  Warning = 1,
99  Error = 2,
100 };
101 
102 constexpr websocketpp::log::level StatusLevelToLogLevel(StatusLevel level) {
103  switch (level) {
104  case StatusLevel::Info:
105  return APP;
107  return WARNING;
108  case StatusLevel::Error:
109  return RECOVERABLE;
110  default:
111  return RECOVERABLE;
112  }
113 }
114 
115 template <typename ServerConfiguration>
116 class Server final : public ServerInterface<ConnHandle> {
117 public:
118  using ServerType = websocketpp::server<ServerConfiguration>;
119  using ConnectionType = websocketpp::connection<ServerConfiguration>;
120  using MessagePtr = typename ServerType::message_ptr;
121  using Tcp = websocketpp::lib::asio::ip::tcp;
122 
123  explicit Server(std::string name, LogCallback logger, const ServerOptions& options);
124  virtual ~Server();
125 
126  Server(const Server&) = delete;
127  Server(Server&&) = delete;
128  Server& operator=(const Server&) = delete;
129  Server& operator=(Server&&) = delete;
130 
131  void start(const std::string& host, uint16_t port) override;
132  void stop() override;
133 
134  std::vector<ChannelId> addChannels(const std::vector<ChannelWithoutId>& channels) override;
135  void removeChannels(const std::vector<ChannelId>& channelIds) override;
136  void publishParameterValues(ConnHandle clientHandle, const std::vector<Parameter>& parameters,
137  const std::optional<std::string>& requestId = std::nullopt) override;
138  void updateParameterValues(const std::vector<Parameter>& parameters) override;
139  std::vector<ServiceId> addServices(const std::vector<ServiceWithoutId>& services) override;
140  void removeServices(const std::vector<ServiceId>& serviceIds) override;
141 
142  void setHandlers(ServerHandlers<ConnHandle>&& handlers) override;
143 
144  void sendMessage(ConnHandle clientHandle, ChannelId chanId, uint64_t timestamp,
145  const uint8_t* payload, size_t payloadSize) override;
146  void broadcastTime(uint64_t timestamp) override;
147  void sendServiceResponse(ConnHandle clientHandle, const ServiceResponse& response) override;
148  void sendServiceFailure(ConnHandle clientHandle, ServiceId serviceId, uint32_t callId,
149  const std::string& message) override;
151  const MapOfSets& advertisedServices) override;
152  void sendFetchAssetResponse(ConnHandle clientHandle, const FetchAssetResponse& response) override;
153 
154  uint16_t getPort() override;
155  std::string remoteEndpointString(ConnHandle clientHandle) override;
156 
157 private:
158  struct ClientInfo {
159  std::string name;
161  std::unordered_map<ChannelId, SubscriptionId> subscriptionsByChannel;
162  std::unordered_set<ClientChannelId> advertisedChannels;
164 
165  explicit ClientInfo(const std::string& name, ConnHandle handle)
166  : name(name)
167  , handle(handle) {}
168 
169  ClientInfo(const ClientInfo&) = delete;
170  ClientInfo& operator=(const ClientInfo&) = delete;
171 
172  ClientInfo(ClientInfo&&) = default;
173  ClientInfo& operator=(ClientInfo&&) = default;
174  };
175 
176  std::string _name;
180  std::unique_ptr<std::thread> _serverThread;
181  std::unique_ptr<CallbackQueue> _handlerCallbackQueue;
182 
183  uint32_t _nextChannelId = 0;
184  std::map<ConnHandle, ClientInfo, std::owner_less<>> _clients;
185  std::unordered_map<ChannelId, Channel> _channels;
186  std::map<ConnHandle, std::unordered_map<ClientChannelId, ClientAdvertisement>, std::owner_less<>>
188  std::map<ConnHandle, std::unordered_set<std::string>, std::owner_less<>>
191  std::unordered_map<ServiceId, ServiceWithoutId> _services;
193  std::shared_mutex _clientsMutex;
194  std::shared_mutex _channelsMutex;
195  std::shared_mutex _clientChannelsMutex;
196  std::shared_mutex _servicesMutex;
198 
199  struct {
205  std::shared_mutex _connectionGraphMutex;
206 
207  void setupTlsHandler();
208  void socketInit(ConnHandle hdl);
209  bool validateConnection(ConnHandle hdl);
212  void handleMessage(ConnHandle hdl, MessagePtr msg);
213  void handleTextMessage(ConnHandle hdl, MessagePtr msg);
215 
216  void sendJson(ConnHandle hdl, json&& payload);
217  void sendJsonRaw(ConnHandle hdl, const std::string& payload);
218  void sendBinary(ConnHandle hdl, const uint8_t* payload, size_t payloadSize);
219  void sendStatusAndLogMsg(ConnHandle clientHandle, const StatusLevel level,
220  const std::string& message);
222  const std::unordered_set<std::string>& paramNames);
223  bool isParameterSubscribed(const std::string& paramName) const;
224  bool hasCapability(const std::string& capability) const;
225  bool hasHandler(uint32_t op) const;
226  void handleSubscribe(const nlohmann::json& payload, ConnHandle hdl);
227  void handleUnsubscribe(const nlohmann::json& payload, ConnHandle hdl);
228  void handleAdvertise(const nlohmann::json& payload, ConnHandle hdl);
229  void handleUnadvertise(const nlohmann::json& payload, ConnHandle hdl);
230  void handleGetParameters(const nlohmann::json& payload, ConnHandle hdl);
231  void handleSetParameters(const nlohmann::json& payload, ConnHandle hdl);
236  void handleFetchAsset(const nlohmann::json& payload, ConnHandle hdl);
237 };
238 
239 template <typename ServerConfiguration>
240 inline Server<ServerConfiguration>::Server(std::string name, LogCallback logger,
241  const ServerOptions& options)
242  : _name(std::move(name))
243  , _logger(logger)
244  , _options(options) {
245  // Redirect logging
246  _server.get_alog().set_callback(_logger);
247  _server.get_elog().set_callback(_logger);
248 
249  websocketpp::lib::error_code ec;
250  _server.init_asio(ec);
251  if (ec) {
252  throw std::runtime_error("Failed to initialize websocket server: " + ec.message());
253  }
254 
255  _server.clear_access_channels(websocketpp::log::alevel::all);
256  _server.set_access_channels(APP);
257  _server.set_tcp_pre_init_handler(std::bind(&Server::socketInit, this, std::placeholders::_1));
258  this->setupTlsHandler();
259  _server.set_validate_handler(std::bind(&Server::validateConnection, this, std::placeholders::_1));
260  _server.set_open_handler(std::bind(&Server::handleConnectionOpened, this, std::placeholders::_1));
261  _server.set_close_handler([this](ConnHandle hdl) {
262  _handlerCallbackQueue->addCallback([this, hdl]() {
263  this->handleConnectionClosed(hdl);
264  });
265  });
266  _server.set_message_handler([this](ConnHandle hdl, MessagePtr msg) {
267  _handlerCallbackQueue->addCallback([this, hdl, msg]() {
268  this->handleMessage(hdl, msg);
269  });
270  });
271  _server.set_reuse_addr(true);
272  _server.set_listen_backlog(128);
273 
274  // Callback queue for handling client requests and disconnections.
275  _handlerCallbackQueue = std::make_unique<CallbackQueue>(_logger, /*numThreads=*/1ul);
276 }
277 
278 template <typename ServerConfiguration>
280 
281 template <typename ServerConfiguration>
283  websocketpp::lib::asio::error_code ec;
284  _server.get_con_from_hdl(hdl)->get_raw_socket().set_option(Tcp::no_delay(true), ec);
285  if (ec) {
286  _server.get_elog().write(RECOVERABLE, "Failed to set TCP_NODELAY: " + ec.message());
287  }
288 }
289 
290 template <typename ServerConfiguration>
292  auto con = _server.get_con_from_hdl(hdl);
293 
294  const auto& subprotocols = con->get_requested_subprotocols();
295  if (std::find(subprotocols.begin(), subprotocols.end(), SUPPORTED_SUBPROTOCOL) !=
296  subprotocols.end()) {
297  con->select_subprotocol(SUPPORTED_SUBPROTOCOL);
298  return true;
299  }
300  _server.get_alog().write(APP, "Rejecting client " + remoteEndpointString(hdl) +
301  " which did not declare support for subprotocol " +
303  return false;
304 }
305 
306 template <typename ServerConfiguration>
308  auto con = _server.get_con_from_hdl(hdl);
309  const auto endpoint = remoteEndpointString(hdl);
310  _server.get_alog().write(APP, "Client " + endpoint + " connected via " + con->get_resource());
311 
312  {
313  std::unique_lock<std::shared_mutex> lock(_clientsMutex);
314  _clients.emplace(hdl, ClientInfo(endpoint, hdl));
315  }
316 
317  con->send(json({
318  {"op", "serverInfo"},
319  {"name", _name},
320  {"capabilities", _options.capabilities},
321  {"supportedEncodings", _options.supportedEncodings},
322  {"metadata", _options.metadata},
323  {"sessionId", _options.sessionId},
324  })
325  .dump());
326 
327  std::vector<Channel> channels;
328  {
329  std::shared_lock<std::shared_mutex> lock(_channelsMutex);
330  for (const auto& [id, channel] : _channels) {
331  (void)id;
332  channels.push_back(channel);
333  }
334  }
335  sendJson(hdl, {
336  {"op", "advertise"},
337  {"channels", std::move(channels)},
338  });
339 
340  std::vector<Service> services;
341  {
342  std::shared_lock<std::shared_mutex> lock(_servicesMutex);
343  for (const auto& [id, service] : _services) {
344  services.push_back(Service(service, id));
345  }
346  }
347  sendJson(hdl, {
348  {"op", "advertiseServices"},
349  {"services", std::move(services)},
350  });
351 }
352 
353 template <typename ServerConfiguration>
355  std::unordered_map<ChannelId, SubscriptionId> oldSubscriptionsByChannel;
356  std::unordered_set<ClientChannelId> oldAdvertisedChannels;
357  std::string clientName;
358  bool wasSubscribedToConnectionGraph;
359  {
360  std::unique_lock<std::shared_mutex> lock(_clientsMutex);
361  const auto clientIt = _clients.find(hdl);
362  if (clientIt == _clients.end()) {
363  _server.get_elog().write(RECOVERABLE, "Client " + remoteEndpointString(hdl) +
364  " disconnected but not found in _clients");
365  return;
366  }
367 
368  const auto& client = clientIt->second;
369  clientName = client.name;
370  _server.get_alog().write(APP, "Client " + clientName + " disconnected");
371 
372  oldSubscriptionsByChannel = std::move(client.subscriptionsByChannel);
373  oldAdvertisedChannels = std::move(client.advertisedChannels);
374  wasSubscribedToConnectionGraph = client.subscribedToConnectionGraph;
375  _clients.erase(clientIt);
376  }
377 
378  // Unadvertise all channels this client advertised
379  for (const auto clientChannelId : oldAdvertisedChannels) {
380  _server.get_alog().write(APP, "Client " + clientName + " unadvertising channel " +
381  std::to_string(clientChannelId) + " due to disconnect");
382  if (_handlers.clientUnadvertiseHandler) {
383  try {
384  _handlers.clientUnadvertiseHandler(clientChannelId, hdl);
385  } catch (const std::exception& ex) {
386  _server.get_elog().write(
387  RECOVERABLE, "Exception caught when closing connection: " + std::string(ex.what()));
388  } catch (...) {
389  _server.get_elog().write(RECOVERABLE, "Exception caught when closing connection");
390  }
391  }
392  }
393 
394  {
395  std::unique_lock<std::shared_mutex> lock(_clientChannelsMutex);
396  _clientChannels.erase(hdl);
397  }
398 
399  // Unsubscribe all channels this client subscribed to
400  if (_handlers.unsubscribeHandler) {
401  for (const auto& [chanId, subs] : oldSubscriptionsByChannel) {
402  (void)subs;
403  try {
404  _handlers.unsubscribeHandler(chanId, hdl);
405  } catch (const std::exception& ex) {
406  _server.get_elog().write(
407  RECOVERABLE, "Exception caught when closing connection: " + std::string(ex.what()));
408  } catch (...) {
409  _server.get_elog().write(RECOVERABLE, "Exception caught when closing connection");
410  }
411  }
412  }
413 
414  // Unsubscribe from parameters this client subscribed to
415  std::unordered_set<std::string> clientSubscribedParameters;
416  {
417  std::lock_guard<std::mutex> lock(_clientParamSubscriptionsMutex);
418  clientSubscribedParameters = _clientParamSubscriptions[hdl];
419  _clientParamSubscriptions.erase(hdl);
420  }
421  unsubscribeParamsWithoutSubscriptions(hdl, clientSubscribedParameters);
422 
423  if (wasSubscribedToConnectionGraph) {
424  std::unique_lock<std::shared_mutex> lock(_connectionGraphMutex);
425  _connectionGraph.subscriptionCount--;
426  if (_connectionGraph.subscriptionCount == 0 && _handlers.subscribeConnectionGraphHandler) {
427  _server.get_alog().write(APP, "Unsubscribing from connection graph updates.");
428  try {
429  _handlers.subscribeConnectionGraphHandler(false);
430  } catch (const std::exception& ex) {
431  _server.get_elog().write(
432  RECOVERABLE, "Exception caught when closing connection: " + std::string(ex.what()));
433  } catch (...) {
434  _server.get_elog().write(RECOVERABLE, "Exception caught when closing connection");
435  }
436  }
437  }
438 
439 } // namespace foxglove
440 
441 template <typename ServerConfiguration>
443  _handlers = handlers;
444 }
445 
446 template <typename ServerConfiguration>
448  if (_server.stopped()) {
449  return;
450  }
451 
452  _server.get_alog().write(APP, "Stopping WebSocket server");
453  websocketpp::lib::error_code ec;
454 
455  _server.stop_perpetual();
456 
457  if (_server.is_listening()) {
458  _server.stop_listening(ec);
459  if (ec) {
460  _server.get_elog().write(RECOVERABLE, "Failed to stop listening: " + ec.message());
461  }
462  }
463 
464  std::vector<std::shared_ptr<ConnectionType>> connections;
465  {
466  std::shared_lock<std::shared_mutex> lock(_clientsMutex);
467  connections.reserve(_clients.size());
468  for (const auto& [hdl, client] : _clients) {
469  (void)client;
470  if (auto connection = _server.get_con_from_hdl(hdl, ec)) {
471  connections.push_back(connection);
472  }
473  }
474  }
475 
476  if (!connections.empty()) {
477  _server.get_alog().write(
478  APP, "Closing " + std::to_string(connections.size()) + " client connection(s)");
479 
480  // Iterate over all client connections and start the close connection handshake
481  for (const auto& connection : connections) {
482  connection->close(websocketpp::close::status::going_away, "server shutdown", ec);
483  if (ec) {
484  _server.get_elog().write(RECOVERABLE, "Failed to close connection: " + ec.message());
485  }
486  }
487 
488  // Wait for all connections to close
489  constexpr size_t MAX_SHUTDOWN_MS = 1000;
490  constexpr size_t SLEEP_MS = 10;
491  size_t durationMs = 0;
492  while (!_server.stopped() && durationMs < MAX_SHUTDOWN_MS) {
493  std::this_thread::sleep_for(std::chrono::milliseconds(SLEEP_MS));
494  _server.poll_one();
495  durationMs += SLEEP_MS;
496  }
497 
498  if (!_server.stopped()) {
499  _server.get_elog().write(RECOVERABLE, "Failed to close all connections, forcefully stopping");
500  for (const auto& hdl : connections) {
501  if (auto con = _server.get_con_from_hdl(hdl, ec)) {
502  _server.get_elog().write(RECOVERABLE,
503  "Terminating connection to " + remoteEndpointString(hdl));
504  con->terminate(ec);
505  }
506  }
507  _server.stop();
508  }
509  }
510 
511  _server.get_alog().write(APP, "All WebSocket connections closed");
512 
513  if (_serverThread) {
514  _server.get_alog().write(APP, "Waiting for WebSocket server run loop to terminate");
515  _serverThread->join();
516  _serverThread.reset();
517  _server.get_alog().write(APP, "WebSocket server run loop terminated");
518  }
519 
520  std::unique_lock<std::shared_mutex> lock(_clientsMutex);
521  _clients.clear();
522 }
523 
524 template <typename ServerConfiguration>
525 inline void Server<ServerConfiguration>::start(const std::string& host, uint16_t port) {
526  if (_serverThread) {
527  throw std::runtime_error("Server already started");
528  }
529 
530  websocketpp::lib::error_code ec;
531 
532  _server.listen(host, std::to_string(port), ec);
533  if (ec) {
534  throw std::runtime_error("Failed to listen on port " + std::to_string(port) + ": " +
535  ec.message());
536  }
537 
538  _server.start_accept(ec);
539  if (ec) {
540  throw std::runtime_error("Failed to start accepting connections: " + ec.message());
541  }
542 
543  _serverThread = std::make_unique<std::thread>([this]() {
544  _server.get_alog().write(APP, "WebSocket server run loop started");
545  _server.run();
546  _server.get_alog().write(APP, "WebSocket server run loop stopped");
547  });
548 
549  if (!_server.is_listening()) {
550  throw std::runtime_error("WebSocket server failed to listen on port " + std::to_string(port));
551  }
552 
553  websocketpp::lib::asio::error_code asioEc;
554  auto endpoint = _server.get_local_endpoint(asioEc);
555  if (asioEc) {
556  throw std::runtime_error("Failed to resolve the local endpoint: " + asioEc.message());
557  }
558 
559  const std::string protocol = _options.useTls ? "wss" : "ws";
560  auto address = endpoint.address();
561  _server.get_alog().write(APP, "WebSocket server listening at " + protocol + "://" +
562  IPAddressToString(address) + ":" +
563  std::to_string(endpoint.port()));
564 }
565 
566 template <typename ServerConfiguration>
568  try {
569  _server.send(hdl, std::move(payload).dump(), OpCode::TEXT);
570  } catch (std::exception const& e) {
571  _server.get_elog().write(RECOVERABLE, e.what());
572  }
573 }
574 
575 template <typename ServerConfiguration>
576 inline void Server<ServerConfiguration>::sendJsonRaw(ConnHandle hdl, const std::string& payload) {
577  try {
578  _server.send(hdl, payload, OpCode::TEXT);
579  } catch (std::exception const& e) {
580  _server.get_elog().write(RECOVERABLE, e.what());
581  }
582 }
583 
584 template <typename ServerConfiguration>
585 inline void Server<ServerConfiguration>::sendBinary(ConnHandle hdl, const uint8_t* payload,
586  size_t payloadSize) {
587  try {
588  _server.send(hdl, payload, payloadSize, OpCode::BINARY);
589  } catch (std::exception const& e) {
590  _server.get_elog().write(RECOVERABLE, e.what());
591  }
592 }
593 
594 template <typename ServerConfiguration>
596  const StatusLevel level,
597  const std::string& message) {
598  const std::string endpoint = remoteEndpointString(clientHandle);
599  const std::string logMessage = endpoint + ": " + message;
600  const auto logLevel = StatusLevelToLogLevel(level);
601  auto logger = level == StatusLevel::Info ? _server.get_alog() : _server.get_elog();
602  logger.write(logLevel, logMessage);
603 
604  sendJson(clientHandle, json{
605  {"op", "status"},
606  {"level", static_cast<uint8_t>(level)},
607  {"message", message},
608  });
609 }
610 
611 template <typename ServerConfiguration>
613  const OpCode op = msg->get_opcode();
614  try {
615  if (op == OpCode::TEXT) {
616  handleTextMessage(hdl, msg);
617  } else if (op == OpCode::BINARY) {
618  handleBinaryMessage(hdl, msg);
619  }
620  } catch (const std::exception& e) {
621  sendStatusAndLogMsg(hdl, StatusLevel::Error, e.what());
622  } catch (...) {
623  sendStatusAndLogMsg(hdl, StatusLevel::Error,
624  "Exception occurred when executing message handler");
625  }
626 }
627 
628 template <typename ServerConfiguration>
630  const json payload = json::parse(msg->get_payload());
631  const std::string& op = payload.at("op").get<std::string>();
632 
633  const auto requiredCapabilityIt = CAPABILITY_BY_CLIENT_OPERATION.find(op);
634  if (requiredCapabilityIt != CAPABILITY_BY_CLIENT_OPERATION.end() &&
635  !hasCapability(requiredCapabilityIt->second)) {
636  sendStatusAndLogMsg(hdl, StatusLevel::Error,
637  "Operation '" + op + "' not supported as server capability '" +
638  requiredCapabilityIt->second + "' is missing");
639  return;
640  }
641 
642  if (!hasHandler(StringHash(op))) {
643  sendStatusAndLogMsg(
644  hdl, StatusLevel::Error,
645  "Operation '" + op + "' not supported as server handler function is missing");
646  return;
647  }
648 
649  try {
650  switch (StringHash(op)) {
651  case SUBSCRIBE:
652  handleSubscribe(payload, hdl);
653  break;
654  case UNSUBSCRIBE:
655  handleUnsubscribe(payload, hdl);
656  break;
657  case ADVERTISE:
658  handleAdvertise(payload, hdl);
659  break;
660  case UNADVERTISE:
661  handleUnadvertise(payload, hdl);
662  break;
663  case GET_PARAMETERS:
664  handleGetParameters(payload, hdl);
665  break;
666  case SET_PARAMETERS:
667  handleSetParameters(payload, hdl);
668  break;
669  case SUBSCRIBE_PARAMETER_UPDATES:
670  handleSubscribeParameterUpdates(payload, hdl);
671  break;
672  case UNSUBSCRIBE_PARAMETER_UPDATES:
673  handleUnsubscribeParameterUpdates(payload, hdl);
674  break;
675  case SUBSCRIBE_CONNECTION_GRAPH:
676  handleSubscribeConnectionGraph(hdl);
677  break;
678  case UNSUBSCRIBE_CONNECTION_GRAPH:
679  handleUnsubscribeConnectionGraph(hdl);
680  break;
681  case FETCH_ASSET:
682  handleFetchAsset(payload, hdl);
683  break;
684  default:
685  sendStatusAndLogMsg(hdl, StatusLevel::Error, "Unrecognized client opcode \"" + op + "\"");
686  break;
687  }
688  } catch (const ExeptionWithId<uint32_t>& e) {
689  const std::string postfix = " (op: " + op + ", id: " + std::to_string(e.id()) + ")";
690  sendStatusAndLogMsg(hdl, StatusLevel::Error, e.what() + postfix);
691  } catch (const std::exception& e) {
692  const std::string postfix = " (op: " + op + ")";
693  sendStatusAndLogMsg(hdl, StatusLevel::Error, e.what() + postfix);
694  } catch (...) {
695  const std::string postfix = " (op: " + op + ")";
696  sendStatusAndLogMsg(hdl, StatusLevel::Error, "Failed to execute handler" + postfix);
697  }
698 } // namespace foxglove
699 
700 template <typename ServerConfiguration>
702  const auto& payload = msg->get_payload();
703  const uint8_t* data = reinterpret_cast<const uint8_t*>(payload.data());
704  const size_t length = payload.size();
705 
706  if (length < 1) {
707  sendStatusAndLogMsg(hdl, StatusLevel::Error, "Received an empty binary message");
708  return;
709  }
710 
711  const auto op = static_cast<ClientBinaryOpcode>(data[0]);
712 
713  const auto requiredCapabilityIt = CAPABILITY_BY_CLIENT_BINARY_OPERATION.find(op);
714  if (requiredCapabilityIt != CAPABILITY_BY_CLIENT_BINARY_OPERATION.end() &&
715  !hasCapability(requiredCapabilityIt->second)) {
716  sendStatusAndLogMsg(hdl, StatusLevel::Error,
717  "Binary operation '" + std::to_string(static_cast<int>(op)) +
718  "' not supported as server capability '" + requiredCapabilityIt->second +
719  "' is missing");
720  return;
721  }
722 
723  switch (op) {
725  if (!_handlers.clientMessageHandler) {
726  return;
727  }
728 
729  if (length < 5) {
730  sendStatusAndLogMsg(hdl, StatusLevel::Error,
731  "Invalid message length " + std::to_string(length));
732  return;
733  }
734  const auto timestamp = std::chrono::duration_cast<std::chrono::nanoseconds>(
735  std::chrono::high_resolution_clock::now().time_since_epoch())
736  .count();
737  const ClientChannelId channelId = *reinterpret_cast<const ClientChannelId*>(data + 1);
738  std::shared_lock<std::shared_mutex> lock(_clientChannelsMutex);
739 
740  auto clientPublicationsIt = _clientChannels.find(hdl);
741  if (clientPublicationsIt == _clientChannels.end()) {
742  sendStatusAndLogMsg(hdl, StatusLevel::Error, "Client has no advertised channels");
743  return;
744  }
745 
746  auto& clientPublications = clientPublicationsIt->second;
747  const auto& channelIt = clientPublications.find(channelId);
748  if (channelIt == clientPublications.end()) {
749  sendStatusAndLogMsg(hdl, StatusLevel::Error,
750  "Channel " + std::to_string(channelId) + " is not advertised");
751  return;
752  }
753 
754  try {
755  const auto& advertisement = channelIt->second;
756  const uint32_t sequence = 0;
757  const ClientMessage clientMessage{static_cast<uint64_t>(timestamp),
758  static_cast<uint64_t>(timestamp),
759  sequence,
760  advertisement,
761  length,
762  data};
763  _handlers.clientMessageHandler(clientMessage, hdl);
764  } catch (const ClientChannelError& e) {
765  sendStatusAndLogMsg(hdl, StatusLevel::Error, e.what());
766  } catch (...) {
767  sendStatusAndLogMsg(hdl, StatusLevel::Error, "clientMessage: Failed to execute handler");
768  }
769  } break;
771  ServiceRequest request;
772  if (length < request.size()) {
773  const std::string errMessage =
774  "Invalid service call request length " + std::to_string(length);
775  sendServiceFailure(hdl, request.serviceId, request.callId, errMessage);
776  _server.get_elog().write(RECOVERABLE, errMessage);
777  return;
778  }
779 
780  request.read(data + 1, length - 1);
781 
782  {
783  std::shared_lock<std::shared_mutex> lock(_servicesMutex);
784  if (_services.find(request.serviceId) == _services.end()) {
785  const std::string errMessage =
786  "Service " + std::to_string(request.serviceId) + " is not advertised";
787  sendServiceFailure(hdl, request.serviceId, request.callId, errMessage);
788  _server.get_elog().write(RECOVERABLE, errMessage);
789  return;
790  }
791  }
792 
793  try {
794  if (!_handlers.serviceRequestHandler) {
795  throw foxglove::ServiceError(request.serviceId, "No service handler");
796  }
797 
798  _handlers.serviceRequestHandler(request, hdl);
799  } catch (const std::exception& e) {
800  sendServiceFailure(hdl, request.serviceId, request.callId, e.what());
801  _server.get_elog().write(RECOVERABLE, e.what());
802  }
803  } break;
804  default: {
805  sendStatusAndLogMsg(hdl, StatusLevel::Error,
806  "Unrecognized client opcode " + std::to_string(uint8_t(op)));
807  } break;
808  }
809 }
810 
811 template <typename ServerConfiguration>
812 inline std::vector<ChannelId> Server<ServerConfiguration>::addChannels(
813  const std::vector<ChannelWithoutId>& channels) {
814  if (channels.empty()) {
815  return {};
816  }
817 
818  std::vector<ChannelId> channelIds;
819  channelIds.reserve(channels.size());
820  json::array_t channelsJson;
821 
822  {
823  std::unique_lock<std::shared_mutex> lock(_channelsMutex);
824  for (const auto& channelWithoutId : channels) {
825  const auto newId = ++_nextChannelId;
826  channelIds.push_back(newId);
827  Channel newChannel{newId, channelWithoutId};
828  channelsJson.push_back(newChannel);
829  _channels.emplace(newId, std::move(newChannel));
830  }
831  }
832 
833  const auto msg = json{{"op", "advertise"}, {"channels", channelsJson}}.dump();
834  std::shared_lock<std::shared_mutex> clientsLock(_clientsMutex);
835  for (const auto& [hdl, clientInfo] : _clients) {
836  (void)clientInfo;
837  sendJsonRaw(hdl, msg);
838  }
839 
840  return channelIds;
841 }
842 
843 template <typename ServerConfiguration>
844 inline void Server<ServerConfiguration>::removeChannels(const std::vector<ChannelId>& channelIds) {
845  if (channelIds.empty()) {
846  return;
847  }
848 
849  {
850  std::unique_lock<std::shared_mutex> channelsLock(_channelsMutex);
851  for (auto channelId : channelIds) {
852  _channels.erase(channelId);
853  }
854  }
855 
856  const auto msg = json{{"op", "unadvertise"}, {"channelIds", channelIds}}.dump();
857 
858  std::unique_lock<std::shared_mutex> clientsLock(_clientsMutex);
859  for (auto& [hdl, clientInfo] : _clients) {
860  for (auto channelId : channelIds) {
861  if (const auto it = clientInfo.subscriptionsByChannel.find(channelId);
862  it != clientInfo.subscriptionsByChannel.end()) {
863  clientInfo.subscriptionsByChannel.erase(it);
864  }
865  }
866  sendJsonRaw(hdl, msg);
867  }
868 }
869 
870 template <typename ServerConfiguration>
872  ConnHandle hdl, const std::vector<Parameter>& parameters,
873  const std::optional<std::string>& requestId) {
874  // Filter out parameters which are not set.
875  std::vector<Parameter> nonEmptyParameters;
876  std::copy_if(parameters.begin(), parameters.end(), std::back_inserter(nonEmptyParameters),
877  [](const auto& p) {
878  return p.getType() != ParameterType::PARAMETER_NOT_SET;
879  });
880 
881  nlohmann::json jsonPayload{{"op", "parameterValues"}, {"parameters", nonEmptyParameters}};
882  if (requestId) {
883  jsonPayload["id"] = requestId.value();
884  }
885  sendJsonRaw(hdl, jsonPayload.dump());
886 }
887 
888 template <typename ServerConfiguration>
890  const std::vector<Parameter>& parameters) {
891  std::lock_guard<std::mutex> lock(_clientParamSubscriptionsMutex);
892  for (const auto& clientParamSubscriptions : _clientParamSubscriptions) {
893  std::vector<Parameter> paramsToSendToClient;
894 
895  // Only consider parameters that are subscribed by the client
896  std::copy_if(parameters.begin(), parameters.end(), std::back_inserter(paramsToSendToClient),
897  [clientParamSubscriptions](const Parameter& param) {
898  return clientParamSubscriptions.second.find(param.getName()) !=
899  clientParamSubscriptions.second.end();
900  });
901 
902  if (!paramsToSendToClient.empty()) {
903  publishParameterValues(clientParamSubscriptions.first, paramsToSendToClient);
904  }
905  }
906 }
907 
908 template <typename ServerConfiguration>
909 inline std::vector<ServiceId> Server<ServerConfiguration>::addServices(
910  const std::vector<ServiceWithoutId>& services) {
911  if (services.empty()) {
912  return {};
913  }
914 
915  std::unique_lock<std::shared_mutex> lock(_servicesMutex);
916  std::vector<ServiceId> serviceIds;
917  json newServices;
918  for (const auto& service : services) {
919  const ServiceId serviceId = ++_nextServiceId;
920  _services.emplace(serviceId, service);
921  serviceIds.push_back(serviceId);
922  newServices.push_back(Service(service, serviceId));
923  }
924 
925  const auto msg = json{{"op", "advertiseServices"}, {"services", std::move(newServices)}}.dump();
926  std::shared_lock<std::shared_mutex> clientsLock(_clientsMutex);
927  for (const auto& [hdl, clientInfo] : _clients) {
928  (void)clientInfo;
929  sendJsonRaw(hdl, msg);
930  }
931 
932  return serviceIds;
933 }
934 
935 template <typename ServerConfiguration>
936 inline void Server<ServerConfiguration>::removeServices(const std::vector<ServiceId>& serviceIds) {
937  std::unique_lock<std::shared_mutex> lock(_servicesMutex);
938  std::vector<ServiceId> removedServices;
939  for (const auto& serviceId : serviceIds) {
940  if (const auto it = _services.find(serviceId); it != _services.end()) {
941  _services.erase(it);
942  removedServices.push_back(serviceId);
943  }
944  }
945 
946  if (!removedServices.empty()) {
947  const auto msg =
948  json{{"op", "unadvertiseServices"}, {"serviceIds", std::move(removedServices)}}.dump();
949  std::shared_lock<std::shared_mutex> clientsLock(_clientsMutex);
950  for (const auto& [hdl, clientInfo] : _clients) {
951  (void)clientInfo;
952  sendJsonRaw(hdl, msg);
953  }
954  }
955 }
956 
957 template <typename ServerConfiguration>
959  uint64_t timestamp, const uint8_t* payload,
960  size_t payloadSize) {
961  websocketpp::lib::error_code ec;
962  const auto con = _server.get_con_from_hdl(clientHandle, ec);
963  if (ec || !con) {
964  return;
965  }
966 
967  const auto bufferSizeinBytes = con->get_buffered_amount();
968  if (bufferSizeinBytes + payloadSize >= _options.sendBufferLimitBytes) {
969  const auto logFn = [this, clientHandle]() {
970  sendStatusAndLogMsg(clientHandle, StatusLevel::Warning, "Send buffer limit reached");
971  };
972  FOXGLOVE_DEBOUNCE(logFn, 2500);
973  return;
974  }
975 
976  SubscriptionId subId = std::numeric_limits<SubscriptionId>::max();
977 
978  {
979  std::shared_lock<std::shared_mutex> lock(_clientsMutex);
980  const auto clientHandleAndInfoIt = _clients.find(clientHandle);
981  if (clientHandleAndInfoIt == _clients.end()) {
982  return; // Client got removed in the meantime.
983  }
984 
985  const auto& client = clientHandleAndInfoIt->second;
986  const auto& subs = client.subscriptionsByChannel.find(chanId);
987  if (subs == client.subscriptionsByChannel.end()) {
988  return; // Client not subscribed to this channel.
989  }
990  subId = subs->second;
991  }
992 
993  std::array<uint8_t, 1 + 4 + 8> msgHeader;
994  msgHeader[0] = uint8_t(BinaryOpcode::MESSAGE_DATA);
995  foxglove::WriteUint32LE(msgHeader.data() + 1, subId);
996  foxglove::WriteUint64LE(msgHeader.data() + 5, timestamp);
997 
998  const size_t messageSize = msgHeader.size() + payloadSize;
999  auto message = con->get_message(OpCode::BINARY, messageSize);
1000  message->set_compressed(_options.useCompression);
1001 
1002  message->set_payload(msgHeader.data(), msgHeader.size());
1003  message->append_payload(payload, payloadSize);
1004  con->send(message);
1005 }
1006 
1007 template <typename ServerConfiguration>
1008 inline void Server<ServerConfiguration>::broadcastTime(uint64_t timestamp) {
1009  std::array<uint8_t, 1 + 8> message;
1010  message[0] = uint8_t(BinaryOpcode::TIME_DATA);
1011  foxglove::WriteUint64LE(message.data() + 1, timestamp);
1012 
1013  std::shared_lock<std::shared_mutex> lock(_clientsMutex);
1014  for (const auto& [hdl, clientInfo] : _clients) {
1015  (void)clientInfo;
1016  sendBinary(hdl, message.data(), message.size());
1017  }
1018 }
1019 
1020 template <typename ServerConfiguration>
1022  const ServiceResponse& response) {
1023  std::vector<uint8_t> payload(1 + response.size());
1024  payload[0] = uint8_t(BinaryOpcode::SERVICE_CALL_RESPONSE);
1025  response.write(payload.data() + 1);
1026  sendBinary(clientHandle, payload.data(), payload.size());
1027 }
1028 
1029 template <typename ServerConfiguration>
1031  websocketpp::lib::asio::error_code ec;
1032  auto endpoint = _server.get_local_endpoint(ec);
1033  if (ec) {
1034  throw std::runtime_error("Server not listening on any port. Has it been started before?");
1035  }
1036  return endpoint.port();
1037 }
1038 
1039 template <typename ServerConfiguration>
1041  ServiceId serviceId, uint32_t callId,
1042  const std::string& message) {
1043  sendJson(clientHandle, json{{"op", "serviceCallFailure"},
1044  {"serviceId", serviceId},
1045  {"callId", callId},
1046  {"message", message}});
1047 }
1048 
1049 template <typename ServerConfiguration>
1051  const MapOfSets& publishedTopics, const MapOfSets& subscribedTopics,
1052  const MapOfSets& advertisedServices) {
1053  json::array_t publisherDiff, subscriberDiff, servicesDiff;
1054  std::unordered_set<std::string> topicNames, serviceNames;
1055  std::unordered_set<std::string> knownTopicNames, knownServiceNames;
1056  {
1057  std::unique_lock<std::shared_mutex> lock(_connectionGraphMutex);
1058  for (const auto& [name, publisherIds] : publishedTopics) {
1059  const auto it = _connectionGraph.publishedTopics.find(name);
1060  if (it == _connectionGraph.publishedTopics.end() ||
1061  _connectionGraph.publishedTopics[name] != publisherIds) {
1062  publisherDiff.push_back(nlohmann::json{{"name", name}, {"publisherIds", publisherIds}});
1063  }
1064  topicNames.insert(name);
1065  }
1066  for (const auto& [name, subscriberIds] : subscribedTopics) {
1067  const auto it = _connectionGraph.subscribedTopics.find(name);
1068  if (it == _connectionGraph.subscribedTopics.end() ||
1069  _connectionGraph.subscribedTopics[name] != subscriberIds) {
1070  subscriberDiff.push_back(nlohmann::json{{"name", name}, {"subscriberIds", subscriberIds}});
1071  }
1072  topicNames.insert(name);
1073  }
1074  for (const auto& [name, providerIds] : advertisedServices) {
1075  const auto it = _connectionGraph.advertisedServices.find(name);
1076  if (it == _connectionGraph.advertisedServices.end() ||
1077  _connectionGraph.advertisedServices[name] != providerIds) {
1078  servicesDiff.push_back(nlohmann::json{{"name", name}, {"providerIds", providerIds}});
1079  }
1080  serviceNames.insert(name);
1081  }
1082 
1083  for (const auto& nameWithIds : _connectionGraph.publishedTopics) {
1084  knownTopicNames.insert(nameWithIds.first);
1085  }
1086  for (const auto& nameWithIds : _connectionGraph.subscribedTopics) {
1087  knownTopicNames.insert(nameWithIds.first);
1088  }
1089  for (const auto& nameWithIds : _connectionGraph.advertisedServices) {
1090  knownServiceNames.insert(nameWithIds.first);
1091  }
1092 
1093  _connectionGraph.publishedTopics = publishedTopics;
1094  _connectionGraph.subscribedTopics = subscribedTopics;
1095  _connectionGraph.advertisedServices = advertisedServices;
1096  }
1097 
1098  std::vector<std::string> removedTopics, removedServices;
1099  std::copy_if(knownTopicNames.begin(), knownTopicNames.end(), std::back_inserter(removedTopics),
1100  [&topicNames](const std::string& topic) {
1101  return topicNames.find(topic) == topicNames.end();
1102  });
1103  std::copy_if(knownServiceNames.begin(), knownServiceNames.end(),
1104  std::back_inserter(removedServices), [&serviceNames](const std::string& service) {
1105  return serviceNames.find(service) == serviceNames.end();
1106  });
1107 
1108  if (publisherDiff.empty() && subscriberDiff.empty() && servicesDiff.empty() &&
1109  removedTopics.empty() && removedServices.empty()) {
1110  return;
1111  }
1112 
1113  const json msg = {
1114  {"op", "connectionGraphUpdate"}, {"publishedTopics", publisherDiff},
1115  {"subscribedTopics", subscriberDiff}, {"advertisedServices", servicesDiff},
1116  {"removedTopics", removedTopics}, {"removedServices", removedServices},
1117  };
1118  const auto payload = msg.dump();
1119 
1120  std::shared_lock<std::shared_mutex> clientsLock(_clientsMutex);
1121  for (const auto& [hdl, clientInfo] : _clients) {
1122  if (clientInfo.subscribedToConnectionGraph) {
1123  _server.send(hdl, payload, OpCode::TEXT);
1124  }
1125  }
1126 }
1127 
1128 template <typename ServerConfiguration>
1130  websocketpp::lib::error_code ec;
1131  const auto con = _server.get_con_from_hdl(clientHandle, ec);
1132  return con ? con->get_remote_endpoint() : "(unknown)";
1133 }
1134 
1135 template <typename ServerConfiguration>
1136 inline bool Server<ServerConfiguration>::isParameterSubscribed(const std::string& paramName) const {
1137  return std::find_if(_clientParamSubscriptions.begin(), _clientParamSubscriptions.end(),
1138  [paramName](const auto& paramSubscriptions) {
1139  return paramSubscriptions.second.find(paramName) !=
1140  paramSubscriptions.second.end();
1141  }) != _clientParamSubscriptions.end();
1142 }
1143 
1144 template <typename ServerConfiguration>
1146  ConnHandle hdl, const std::unordered_set<std::string>& paramNames) {
1147  std::vector<std::string> paramsToUnsubscribe;
1148  {
1149  std::lock_guard<std::mutex> lock(_clientParamSubscriptionsMutex);
1150  std::copy_if(paramNames.begin(), paramNames.end(), std::back_inserter(paramsToUnsubscribe),
1151  [this](const std::string& paramName) {
1152  return !isParameterSubscribed(paramName);
1153  });
1154  }
1155 
1156  if (_handlers.parameterSubscriptionHandler && !paramsToUnsubscribe.empty()) {
1157  for (const auto& param : paramsToUnsubscribe) {
1158  _server.get_alog().write(APP, "Unsubscribing from parameter '" + param + "'.");
1159  }
1160 
1161  try {
1162  _handlers.parameterSubscriptionHandler(paramsToUnsubscribe,
1164  } catch (const std::exception& e) {
1165  sendStatusAndLogMsg(hdl, StatusLevel::Error, e.what());
1166  } catch (...) {
1167  sendStatusAndLogMsg(hdl, StatusLevel::Error,
1168  "Failed to unsubscribe from one more more parameters");
1169  }
1170  }
1171 }
1172 
1173 template <typename ServerConfiguration>
1174 inline bool Server<ServerConfiguration>::hasCapability(const std::string& capability) const {
1175  return std::find(_options.capabilities.begin(), _options.capabilities.end(), capability) !=
1176  _options.capabilities.end();
1177 }
1178 
1179 template <typename ServerConfiguration>
1180 inline bool Server<ServerConfiguration>::hasHandler(uint32_t op) const {
1181  switch (op) {
1182  case SUBSCRIBE:
1183  return bool(_handlers.subscribeHandler);
1184  case UNSUBSCRIBE:
1185  return bool(_handlers.unsubscribeHandler);
1186  case ADVERTISE:
1187  return bool(_handlers.clientAdvertiseHandler);
1188  case UNADVERTISE:
1189  return bool(_handlers.clientUnadvertiseHandler);
1190  case GET_PARAMETERS:
1191  return bool(_handlers.parameterRequestHandler);
1192  case SET_PARAMETERS:
1193  return bool(_handlers.parameterChangeHandler);
1194  case SUBSCRIBE_PARAMETER_UPDATES:
1195  case UNSUBSCRIBE_PARAMETER_UPDATES:
1196  return bool(_handlers.parameterSubscriptionHandler);
1197  case SUBSCRIBE_CONNECTION_GRAPH:
1198  case UNSUBSCRIBE_CONNECTION_GRAPH:
1199  return bool(_handlers.subscribeConnectionGraphHandler);
1200  case FETCH_ASSET:
1201  return bool(_handlers.fetchAssetHandler);
1202  default:
1203  throw std::runtime_error("Unknown operation: " + std::to_string(op));
1204  }
1205 }
1206 
1207 template <typename ServerConfiguration>
1209  std::unordered_map<ChannelId, SubscriptionId> clientSubscriptionsByChannel;
1210  {
1211  std::shared_lock<std::shared_mutex> clientsLock(_clientsMutex);
1212  clientSubscriptionsByChannel = _clients.at(hdl).subscriptionsByChannel;
1213  }
1214 
1215  const auto findSubscriptionBySubId =
1216  [](const std::unordered_map<ChannelId, SubscriptionId>& subscriptionsByChannel,
1217  SubscriptionId subId) {
1218  return std::find_if(subscriptionsByChannel.begin(), subscriptionsByChannel.end(),
1219  [&subId](const auto& mo) {
1220  return mo.second == subId;
1221  });
1222  };
1223 
1224  for (const auto& sub : payload.at("subscriptions")) {
1225  SubscriptionId subId = sub.at("id");
1226  ChannelId channelId = sub.at("channelId");
1227  if (findSubscriptionBySubId(clientSubscriptionsByChannel, subId) !=
1228  clientSubscriptionsByChannel.end()) {
1229  sendStatusAndLogMsg(hdl, StatusLevel::Error,
1230  "Client subscription id " + std::to_string(subId) +
1231  " was already used; ignoring subscription");
1232  continue;
1233  }
1234  const auto& channelIt = _channels.find(channelId);
1235  if (channelIt == _channels.end()) {
1236  sendStatusAndLogMsg(
1237  hdl, StatusLevel::Warning,
1238  "Channel " + std::to_string(channelId) + " is not available; ignoring subscription");
1239  continue;
1240  }
1241 
1242  _handlers.subscribeHandler(channelId, hdl);
1243  std::unique_lock<std::shared_mutex> clientsLock(_clientsMutex);
1244  _clients.at(hdl).subscriptionsByChannel.emplace(channelId, subId);
1245  }
1246 }
1247 
1248 template <typename ServerConfiguration>
1250  std::unordered_map<ChannelId, SubscriptionId> clientSubscriptionsByChannel;
1251  {
1252  std::shared_lock<std::shared_mutex> clientsLock(_clientsMutex);
1253  clientSubscriptionsByChannel = _clients.at(hdl).subscriptionsByChannel;
1254  }
1255 
1256  const auto findSubscriptionBySubId =
1257  [](const std::unordered_map<ChannelId, SubscriptionId>& subscriptionsByChannel,
1258  SubscriptionId subId) {
1259  return std::find_if(subscriptionsByChannel.begin(), subscriptionsByChannel.end(),
1260  [&subId](const auto& mo) {
1261  return mo.second == subId;
1262  });
1263  };
1264 
1265  for (const auto& subIdJson : payload.at("subscriptionIds")) {
1266  SubscriptionId subId = subIdJson;
1267  const auto& sub = findSubscriptionBySubId(clientSubscriptionsByChannel, subId);
1268  if (sub == clientSubscriptionsByChannel.end()) {
1269  sendStatusAndLogMsg(hdl, StatusLevel::Warning,
1270  "Client subscription id " + std::to_string(subId) +
1271  " did not exist; ignoring unsubscription");
1272  continue;
1273  }
1274 
1275  ChannelId chanId = sub->first;
1276  _handlers.unsubscribeHandler(chanId, hdl);
1277  std::unique_lock<std::shared_mutex> clientsLock(_clientsMutex);
1278  _clients.at(hdl).subscriptionsByChannel.erase(chanId);
1279  }
1280 }
1281 
1282 template <typename ServerConfiguration>
1284  std::unique_lock<std::shared_mutex> clientChannelsLock(_clientChannelsMutex);
1285  auto [clientPublicationsIt, isFirstPublication] =
1286  _clientChannels.emplace(hdl, std::unordered_map<ClientChannelId, ClientAdvertisement>());
1287 
1288  auto& clientPublications = clientPublicationsIt->second;
1289 
1290  for (const auto& chan : payload.at("channels")) {
1291  ClientChannelId channelId = chan.at("id");
1292  if (!isFirstPublication && clientPublications.find(channelId) != clientPublications.end()) {
1293  sendStatusAndLogMsg(hdl, StatusLevel::Error,
1294  "Channel " + std::to_string(channelId) + " was already advertised");
1295  continue;
1296  }
1297 
1298  const auto topic = chan.at("topic").get<std::string>();
1299  if (!isWhitelisted(topic, _options.clientTopicWhitelistPatterns)) {
1300  sendStatusAndLogMsg(hdl, StatusLevel::Error,
1301  "Can't advertise channel " + std::to_string(channelId) + ", topic '" +
1302  topic + "' not whitelisted");
1303  continue;
1304  }
1305  ClientAdvertisement advertisement{};
1306  advertisement.channelId = channelId;
1307  advertisement.topic = topic;
1308  advertisement.encoding = chan.at("encoding").get<std::string>();
1309  advertisement.schemaName = chan.at("schemaName").get<std::string>();
1310 
1311  _handlers.clientAdvertiseHandler(advertisement, hdl);
1312  std::unique_lock<std::shared_mutex> clientsLock(_clientsMutex);
1313  _clients.at(hdl).advertisedChannels.emplace(channelId);
1314  clientPublications.emplace(channelId, advertisement);
1315  }
1316 }
1317 
1318 template <typename ServerConfiguration>
1320  std::unique_lock<std::shared_mutex> clientChannelsLock(_clientChannelsMutex);
1321  auto clientPublicationsIt = _clientChannels.find(hdl);
1322  if (clientPublicationsIt == _clientChannels.end()) {
1323  sendStatusAndLogMsg(hdl, StatusLevel::Error, "Client has no advertised channels");
1324  return;
1325  }
1326 
1327  auto& clientPublications = clientPublicationsIt->second;
1328 
1329  for (const auto& chanIdJson : payload.at("channelIds")) {
1330  ClientChannelId channelId = chanIdJson.get<ClientChannelId>();
1331  const auto& channelIt = clientPublications.find(channelId);
1332  if (channelIt == clientPublications.end()) {
1333  continue;
1334  }
1335 
1336  _handlers.clientUnadvertiseHandler(channelId, hdl);
1337  std::unique_lock<std::shared_mutex> clientsLock(_clientsMutex);
1338  auto& clientInfo = _clients.at(hdl);
1339  clientPublications.erase(channelIt);
1340  const auto advertisedChannelIt = clientInfo.advertisedChannels.find(channelId);
1341  if (advertisedChannelIt != clientInfo.advertisedChannels.end()) {
1342  clientInfo.advertisedChannels.erase(advertisedChannelIt);
1343  }
1344  }
1345 }
1346 
1347 template <typename ServerConfiguration>
1349  ConnHandle hdl) {
1350  const auto paramNames = payload.at("parameterNames").get<std::vector<std::string>>();
1351  const auto requestId = payload.find("id") == payload.end()
1352  ? std::nullopt
1353  : std::optional<std::string>(payload["id"].get<std::string>());
1354  _handlers.parameterRequestHandler(paramNames, requestId, hdl);
1355 }
1356 
1357 template <typename ServerConfiguration>
1359  ConnHandle hdl) {
1360  const auto parameters = payload.at("parameters").get<std::vector<Parameter>>();
1361  const auto requestId = payload.find("id") == payload.end()
1362  ? std::nullopt
1363  : std::optional<std::string>(payload["id"].get<std::string>());
1364  _handlers.parameterChangeHandler(parameters, requestId, hdl);
1365 }
1366 
1367 template <typename ServerConfiguration>
1369  ConnHandle hdl) {
1370  const auto paramNames = payload.at("parameterNames").get<std::unordered_set<std::string>>();
1371  std::vector<std::string> paramsToSubscribe;
1372  {
1373  // Only consider parameters that are not subscribed yet (by this or by other clients)
1374  std::lock_guard<std::mutex> lock(_clientParamSubscriptionsMutex);
1375  std::copy_if(paramNames.begin(), paramNames.end(), std::back_inserter(paramsToSubscribe),
1376  [this](const std::string& paramName) {
1377  return !isParameterSubscribed(paramName);
1378  });
1379 
1380  // Update the client's parameter subscriptions.
1381  auto& clientSubscribedParams = _clientParamSubscriptions[hdl];
1382  clientSubscribedParams.insert(paramNames.begin(), paramNames.end());
1383  }
1384 
1385  if (!paramsToSubscribe.empty()) {
1386  _handlers.parameterSubscriptionHandler(paramsToSubscribe,
1388  }
1389 }
1390 
1391 template <typename ServerConfiguration>
1393  ConnHandle hdl) {
1394  const auto paramNames = payload.at("parameterNames").get<std::unordered_set<std::string>>();
1395  {
1396  std::lock_guard<std::mutex> lock(_clientParamSubscriptionsMutex);
1397  auto& clientSubscribedParams = _clientParamSubscriptions[hdl];
1398  for (const auto& paramName : paramNames) {
1399  clientSubscribedParams.erase(paramName);
1400  }
1401  }
1402 
1403  unsubscribeParamsWithoutSubscriptions(hdl, paramNames);
1404 }
1405 
1406 template <typename ServerConfiguration>
1408  bool subscribeToConnnectionGraph = false;
1409  {
1410  std::unique_lock<std::shared_mutex> lock(_connectionGraphMutex);
1411  _connectionGraph.subscriptionCount++;
1412  subscribeToConnnectionGraph = _connectionGraph.subscriptionCount == 1;
1413  }
1414 
1415  if (subscribeToConnnectionGraph) {
1416  // First subscriber, let the handler know that we are interested in updates.
1417  _server.get_alog().write(APP, "Subscribing to connection graph updates.");
1418  _handlers.subscribeConnectionGraphHandler(true);
1419  std::unique_lock<std::shared_mutex> clientsLock(_clientsMutex);
1420  _clients.at(hdl).subscribedToConnectionGraph = true;
1421  }
1422 
1423  json::array_t publishedTopicsJson, subscribedTopicsJson, advertisedServicesJson;
1424  {
1425  std::shared_lock<std::shared_mutex> lock(_connectionGraphMutex);
1426  for (const auto& [name, ids] : _connectionGraph.publishedTopics) {
1427  publishedTopicsJson.push_back(nlohmann::json{{"name", name}, {"publisherIds", ids}});
1428  }
1429  for (const auto& [name, ids] : _connectionGraph.subscribedTopics) {
1430  subscribedTopicsJson.push_back(nlohmann::json{{"name", name}, {"subscriberIds", ids}});
1431  }
1432  for (const auto& [name, ids] : _connectionGraph.advertisedServices) {
1433  advertisedServicesJson.push_back(nlohmann::json{{"name", name}, {"providerIds", ids}});
1434  }
1435  }
1436 
1437  const json jsonMsg = {
1438  {"op", "connectionGraphUpdate"},
1439  {"publishedTopics", publishedTopicsJson},
1440  {"subscribedTopics", subscribedTopicsJson},
1441  {"advertisedServices", advertisedServicesJson},
1442  {"removedTopics", json::array()},
1443  {"removedServices", json::array()},
1444  };
1445 
1446  sendJsonRaw(hdl, jsonMsg.dump());
1447 }
1448 
1449 template <typename ServerConfiguration>
1451  bool clientWasSubscribed = false;
1452  {
1453  std::unique_lock<std::shared_mutex> clientsLock(_clientsMutex);
1454  auto& clientInfo = _clients.at(hdl);
1455  if (clientInfo.subscribedToConnectionGraph) {
1456  clientWasSubscribed = true;
1457  clientInfo.subscribedToConnectionGraph = false;
1458  }
1459  }
1460 
1461  if (clientWasSubscribed) {
1462  bool unsubscribeFromConnnectionGraph = false;
1463  {
1464  std::unique_lock<std::shared_mutex> lock(_connectionGraphMutex);
1465  _connectionGraph.subscriptionCount--;
1466  unsubscribeFromConnnectionGraph = _connectionGraph.subscriptionCount == 0;
1467  }
1468  if (unsubscribeFromConnnectionGraph) {
1469  _server.get_alog().write(APP, "Unsubscribing from connection graph updates.");
1470  _handlers.subscribeConnectionGraphHandler(false);
1471  }
1472  } else {
1473  sendStatusAndLogMsg(hdl, StatusLevel::Error,
1474  "Client was not subscribed to connection graph updates");
1475  }
1476 }
1477 
1478 template <typename ServerConfiguration>
1480  const auto uri = payload.at("uri").get<std::string>();
1481  const auto requestId = payload.at("requestId").get<uint32_t>();
1482  _handlers.fetchAssetHandler(uri, requestId, hdl);
1483 }
1484 
1485 template <typename ServerConfiguration>
1487  ConnHandle clientHandle, const FetchAssetResponse& response) {
1488  websocketpp::lib::error_code ec;
1489  const auto con = _server.get_con_from_hdl(clientHandle, ec);
1490  if (ec || !con) {
1491  return;
1492  }
1493 
1494  const size_t errMsgSize =
1495  response.status == FetchAssetStatus::Error ? response.errorMessage.size() : 0ul;
1496  const size_t dataSize = response.status == FetchAssetStatus::Success ? response.data.size() : 0ul;
1497  const size_t messageSize = 1 + 4 + 1 + 4 + errMsgSize + dataSize;
1498 
1499  auto message = con->get_message(OpCode::BINARY, messageSize);
1500 
1501  const auto op = BinaryOpcode::FETCH_ASSET_RESPONSE;
1502  message->append_payload(&op, 1);
1503 
1504  std::array<uint8_t, 4> uint32Data;
1505  foxglove::WriteUint32LE(uint32Data.data(), response.requestId);
1506  message->append_payload(uint32Data.data(), uint32Data.size());
1507 
1508  const uint8_t status = static_cast<uint8_t>(response.status);
1509  message->append_payload(&status, 1);
1510 
1511  foxglove::WriteUint32LE(uint32Data.data(), response.errorMessage.size());
1512  message->append_payload(uint32Data.data(), uint32Data.size());
1513  message->append_payload(response.errorMessage.data(), errMsgSize);
1514 
1515  message->append_payload(response.data.data(), dataSize);
1516  con->send(message);
1517 }
1518 
1519 } // namespace foxglove
foxglove::Server::handleMessage
void handleMessage(ConnHandle hdl, MessagePtr msg)
Definition: websocket_server.hpp:612
response
const std::string response
foxglove::Server::sendServiceResponse
void sendServiceResponse(ConnHandle clientHandle, const ServiceResponse &response) override
Definition: websocket_server.hpp:1021
foxglove::isWhitelisted
bool isWhitelisted(const std::string &name, const std::vector< std::regex > &regexPatterns)
Definition: regex_utils.hpp:10
foxglove::BinaryOpcode::FETCH_ASSET_RESPONSE
@ FETCH_ASSET_RESPONSE
foxglove::CAPABILITY_BY_CLIENT_BINARY_OPERATION
const std::unordered_map< ClientBinaryOpcode, std::string > CAPABILITY_BY_CLIENT_BINARY_OPERATION
Map of required capability by client operation (binary).
Definition: websocket_server.hpp:91
foxglove::Server::handleAdvertise
void handleAdvertise(const nlohmann::json &payload, ConnHandle hdl)
Definition: websocket_server.hpp:1283
foxglove::Server::handleSetParameters
void handleSetParameters(const nlohmann::json &payload, ConnHandle hdl)
Definition: websocket_server.hpp:1358
foxglove::Server::handleFetchAsset
void handleFetchAsset(const nlohmann::json &payload, ConnHandle hdl)
Definition: websocket_server.hpp:1479
foxglove::Server::isParameterSubscribed
bool isParameterSubscribed(const std::string &paramName) const
Definition: websocket_server.hpp:1136
foxglove::Server::_servicesMutex
std::shared_mutex _servicesMutex
Definition: websocket_server.hpp:196
foxglove::RECOVERABLE
static const websocketpp::log::level RECOVERABLE
Definition: websocket_server.hpp:73
foxglove
Definition: base64.hpp:8
foxglove::Server::handleSubscribe
void handleSubscribe(const nlohmann::json &payload, ConnHandle hdl)
Definition: websocket_server.hpp:1208
foxglove::StatusLevelToLogLevel
constexpr websocketpp::log::level StatusLevelToLogLevel(StatusLevel level)
Definition: websocket_server.hpp:102
foxglove::Server::publishedTopics
MapOfSets publishedTopics
Definition: websocket_server.hpp:201
foxglove::Server::_nextServiceId
ServiceId _nextServiceId
Definition: websocket_server.hpp:190
foxglove::Channel
Definition: common.hpp:64
foxglove::Server::_handlerCallbackQueue
std::unique_ptr< CallbackQueue > _handlerCallbackQueue
Definition: websocket_server.hpp:181
foxglove::Server::subscriptionCount
int subscriptionCount
Definition: websocket_server.hpp:200
foxglove::BinaryOpcode::MESSAGE_DATA
@ MESSAGE_DATA
foxglove::ClientChannelId
uint32_t ClientChannelId
Definition: common.hpp:27
foxglove::ClientBinaryOpcode::SERVICE_CALL_REQUEST
@ SERVICE_CALL_REQUEST
foxglove::Server::ClientInfo
Definition: websocket_server.hpp:158
foxglove::Server::handleSubscribeConnectionGraph
void handleSubscribeConnectionGraph(ConnHandle hdl)
Definition: websocket_server.hpp:1407
foxglove::Server::validateConnection
bool validateConnection(ConnHandle hdl)
Definition: websocket_server.hpp:291
foxglove::Server::_clientChannels
std::map< ConnHandle, std::unordered_map< ClientChannelId, ClientAdvertisement >, std::owner_less<> > _clientChannels
Definition: websocket_server.hpp:187
common.hpp
foxglove::Server::ClientInfo::handle
ConnHandle handle
Definition: websocket_server.hpp:160
foxglove::Server::sendServiceFailure
void sendServiceFailure(ConnHandle clientHandle, ServiceId serviceId, uint32_t callId, const std::string &message) override
Definition: websocket_server.hpp:1040
foxglove::Server::~Server
virtual ~Server()
Definition: websocket_server.hpp:279
foxglove::LogCallback
std::function< void(WebSocketLogLevel, char const *)> LogCallback
Definition: websocket_logging.hpp:12
foxglove::ServiceResponse
Definition: common.hpp:130
foxglove::Server::addServices
std::vector< ServiceId > addServices(const std::vector< ServiceWithoutId > &services) override
Definition: websocket_server.hpp:909
foxglove::WriteUint64LE
void WriteUint64LE(uint8_t *buf, uint64_t val)
Definition: serialization.hpp:12
foxglove::Server::_channelsMutex
std::shared_mutex _channelsMutex
Definition: websocket_server.hpp:194
foxglove::Server::_connectionGraph
struct foxglove::Server::@0 _connectionGraph
foxglove::Server::handleSubscribeParameterUpdates
void handleSubscribeParameterUpdates(const nlohmann::json &payload, ConnHandle hdl)
Definition: websocket_server.hpp:1368
foxglove::Server::handleConnectionClosed
void handleConnectionClosed(ConnHandle hdl)
Definition: websocket_server.hpp:354
foxglove::BinaryOpcode::SERVICE_CALL_RESPONSE
@ SERVICE_CALL_RESPONSE
foxglove::Server::setHandlers
void setHandlers(ServerHandlers< ConnHandle > &&handlers) override
Definition: websocket_server.hpp:442
foxglove::CAPABILITY_CLIENT_PUBLISH
constexpr char CAPABILITY_CLIENT_PUBLISH[]
Definition: common.hpp:13
foxglove::ServiceResponse::callId
uint32_t callId
Definition: common.hpp:132
foxglove::Server::broadcastTime
void broadcastTime(uint64_t timestamp) override
Definition: websocket_server.hpp:1008
foxglove::FetchAssetResponse
Definition: common.hpp:155
regex_utils.hpp
foxglove::Server::handleUnsubscribe
void handleUnsubscribe(const nlohmann::json &payload, ConnHandle hdl)
Definition: websocket_server.hpp:1249
foxglove::ServiceResponse::size
size_t size() const
Definition: common.hpp:136
foxglove::Server::_channels
std::unordered_map< ChannelId, Channel > _channels
Definition: websocket_server.hpp:185
foxglove::ClientChannelError
Definition: server_interface.hpp:38
foxglove::Server::operator=
Server & operator=(const Server &)=delete
foxglove::Server::handleGetParameters
void handleGetParameters(const nlohmann::json &payload, ConnHandle hdl)
Definition: websocket_server.hpp:1348
foxglove::StatusLevel::Error
@ Error
foxglove::Server::updateParameterValues
void updateParameterValues(const std::vector< Parameter > &parameters) override
Definition: websocket_server.hpp:889
foxglove::Server::_logger
LogCallback _logger
Definition: websocket_server.hpp:177
foxglove::BinaryOpcode::TIME_DATA
@ TIME_DATA
serialization.hpp
foxglove::Server::updateConnectionGraph
void updateConnectionGraph(const MapOfSets &publishedTopics, const MapOfSets &subscribedTopics, const MapOfSets &advertisedServices) override
Definition: websocket_server.hpp:1050
foxglove::Server::_clientParamSubscriptionsMutex
std::mutex _clientParamSubscriptionsMutex
Definition: websocket_server.hpp:197
foxglove::Server::removeChannels
void removeChannels(const std::vector< ChannelId > &channelIds) override
Definition: websocket_server.hpp:844
foxglove::Server::getPort
uint16_t getPort() override
Definition: websocket_server.hpp:1030
foxglove::SUPPORTED_SUBPROTOCOL
constexpr char SUPPORTED_SUBPROTOCOL[]
Definition: common.hpp:12
foxglove::Server::advertisedServices
MapOfSets advertisedServices
Definition: websocket_server.hpp:203
foxglove::CAPABILITY_PARAMETERS_SUBSCRIBE
constexpr char CAPABILITY_PARAMETERS_SUBSCRIBE[]
Definition: common.hpp:16
foxglove::ConnHandle
websocketpp::connection_hdl ConnHandle
Definition: websocket_server.hpp:68
foxglove::Server::setupTlsHandler
void setupTlsHandler()
foxglove::ServerOptions
Definition: server_interface.hpp:45
foxglove::Server::ClientInfo::name
std::string name
Definition: websocket_server.hpp:159
foxglove::Server::subscribedTopics
MapOfSets subscribedTopics
Definition: websocket_server.hpp:202
foxglove::ClientAdvertisement::channelId
ClientChannelId channelId
Definition: common.hpp:78
foxglove::ServerHandlers< ConnHandle >
foxglove::Server::sendMessage
void sendMessage(ConnHandle clientHandle, ChannelId chanId, uint64_t timestamp, const uint8_t *payload, size_t payloadSize) override
Definition: websocket_server.hpp:958
foxglove::ParameterSubscriptionOperation::SUBSCRIBE
@ SUBSCRIBE
foxglove::Parameter
Definition: parameter.hpp:54
foxglove::Server::_clientsMutex
std::shared_mutex _clientsMutex
Definition: websocket_server.hpp:193
foxglove::Server::_connectionGraphMutex
std::shared_mutex _connectionGraphMutex
Definition: websocket_server.hpp:205
foxglove::Server::socketInit
void socketInit(ConnHandle hdl)
Definition: websocket_server.hpp:282
foxglove::Server::_clientParamSubscriptions
std::map< ConnHandle, std::unordered_set< std::string >, std::owner_less<> > _clientParamSubscriptions
Definition: websocket_server.hpp:189
foxglove::ClientAdvertisement
Definition: common.hpp:77
foxglove::json
nlohmann::json json
Definition: websocket_server.hpp:66
foxglove::Server
Definition: websocket_server.hpp:116
foxglove::ClientMessage
Definition: common.hpp:85
foxglove::Server::unsubscribeParamsWithoutSubscriptions
void unsubscribeParamsWithoutSubscriptions(ConnHandle hdl, const std::unordered_set< std::string > &paramNames)
Definition: websocket_server.hpp:1145
foxglove::Server::_services
std::unordered_map< ServiceId, ServiceWithoutId > _services
Definition: websocket_server.hpp:191
foxglove::IPAddressToString
std::string IPAddressToString(const websocketpp::lib::asio::ip::address &addr)
Definition: websocket_logging.hpp:14
foxglove::Server::start
void start(const std::string &host, uint16_t port) override
Definition: websocket_server.hpp:525
foxglove::Server::handleUnadvertise
void handleUnadvertise(const nlohmann::json &payload, ConnHandle hdl)
Definition: websocket_server.hpp:1319
foxglove::ServiceResponse::serviceId
ServiceId serviceId
Definition: common.hpp:131
foxglove::WriteUint32LE
void WriteUint32LE(uint8_t *buf, uint32_t val)
Definition: serialization.hpp:27
foxglove::Server::sendStatusAndLogMsg
void sendStatusAndLogMsg(ConnHandle clientHandle, const StatusLevel level, const std::string &message)
Definition: websocket_server.hpp:595
foxglove::Server::_handlers
ServerHandlers< ConnHandle > _handlers
Definition: websocket_server.hpp:192
foxglove::ExeptionWithId
Definition: server_interface.hpp:21
foxglove::SubscriptionId
uint32_t SubscriptionId
Definition: common.hpp:28
foxglove::Server::ConnectionType
websocketpp::connection< ServerConfiguration > ConnectionType
Definition: websocket_server.hpp:119
foxglove::ExeptionWithId::id
IdType id() const
Definition: server_interface.hpp:27
foxglove::Server::handleUnsubscribeParameterUpdates
void handleUnsubscribeParameterUpdates(const nlohmann::json &payload, ConnHandle hdl)
Definition: websocket_server.hpp:1392
foxglove::Server::ClientInfo::advertisedChannels
std::unordered_set< ClientChannelId > advertisedChannels
Definition: websocket_server.hpp:162
foxglove::Server::_clients
std::map< ConnHandle, ClientInfo, std::owner_less<> > _clients
Definition: websocket_server.hpp:184
foxglove::Server::sendFetchAssetResponse
void sendFetchAssetResponse(ConnHandle clientHandle, const FetchAssetResponse &response) override
Definition: websocket_server.hpp:1486
foxglove::Server::ClientInfo::operator=
ClientInfo & operator=(const ClientInfo &)=delete
foxglove::APP
static const websocketpp::log::level APP
Definition: websocket_server.hpp:71
foxglove::Server::ClientInfo::ClientInfo
ClientInfo(const std::string &name, ConnHandle handle)
Definition: websocket_server.hpp:165
foxglove::StatusLevel::Warning
@ Warning
parameter.hpp
foxglove::CAPABILITY_BY_CLIENT_OPERATION
const std::unordered_map< std::string, std::string > CAPABILITY_BY_CLIENT_OPERATION
Map of required capability by client operation (text).
Definition: websocket_server.hpp:76
foxglove::Server::hasHandler
bool hasHandler(uint32_t op) const
Definition: websocket_server.hpp:1180
foxglove::ClientBinaryOpcode::MESSAGE_DATA
@ MESSAGE_DATA
foxglove::StatusLevel::Info
@ Info
foxglove::CAPABILITY_PARAMETERS
constexpr char CAPABILITY_PARAMETERS[]
Definition: common.hpp:15
foxglove::MapOfSets
std::unordered_map< std::string, std::unordered_set< std::string > > MapOfSets
Definition: server_interface.hpp:18
foxglove::ServerInterface
Definition: server_interface.hpp:80
foxglove::Server::handleConnectionOpened
void handleConnectionOpened(ConnHandle hdl)
Definition: websocket_server.hpp:307
foxglove::Server::hasCapability
bool hasCapability(const std::string &capability) const
Definition: websocket_server.hpp:1174
foxglove::Server::ClientInfo::subscribedToConnectionGraph
bool subscribedToConnectionGraph
Definition: websocket_server.hpp:163
foxglove::Server::ServerType
websocketpp::server< ServerConfiguration > ServerType
Definition: websocket_server.hpp:118
foxglove::Server::_name
std::string _name
Definition: websocket_server.hpp:176
foxglove::ClientBinaryOpcode
ClientBinaryOpcode
Definition: common.hpp:38
foxglove::Server::Tcp
websocketpp::lib::asio::ip::tcp Tcp
Definition: websocket_server.hpp:121
std
foxglove::Server::removeServices
void removeServices(const std::vector< ServiceId > &serviceIds) override
Definition: websocket_server.hpp:936
FOXGLOVE_DEBOUNCE
#define FOXGLOVE_DEBOUNCE(f, ms)
Definition: websocket_server.hpp:31
foxglove::Server::remoteEndpointString
std::string remoteEndpointString(ConnHandle clientHandle) override
Definition: websocket_server.hpp:1129
foxglove::Server::publishParameterValues
void publishParameterValues(ConnHandle clientHandle, const std::vector< Parameter > &parameters, const std::optional< std::string > &requestId=std::nullopt) override
Definition: websocket_server.hpp:871
foxglove::StatusLevel
StatusLevel
Definition: websocket_server.hpp:96
foxglove::WARNING
static const websocketpp::log::level WARNING
Definition: websocket_server.hpp:72
foxglove::Server::handleTextMessage
void handleTextMessage(ConnHandle hdl, MessagePtr msg)
Definition: websocket_server.hpp:629
foxglove::ServiceError
Definition: server_interface.hpp:41
callback_queue.hpp
param
T param(const std::string &param_name, const T &default_val)
foxglove::ChannelId
uint32_t ChannelId
Definition: common.hpp:26
foxglove::Server::_server
ServerType _server
Definition: websocket_server.hpp:179
websocket_logging.hpp
foxglove::FetchAssetStatus::Error
@ Error
foxglove::Server::sendJsonRaw
void sendJsonRaw(ConnHandle hdl, const std::string &payload)
Definition: websocket_server.hpp:576
foxglove::Server::handleBinaryMessage
void handleBinaryMessage(ConnHandle hdl, MessagePtr msg)
Definition: websocket_server.hpp:701
foxglove::Server::MessagePtr
typename ServerType::message_ptr MessagePtr
Definition: websocket_server.hpp:120
foxglove::CAPABILITY_SERVICES
constexpr char CAPABILITY_SERVICES[]
Definition: common.hpp:17
foxglove::CAPABILITY_ASSETS
constexpr char CAPABILITY_ASSETS[]
Definition: common.hpp:19
time_since_epoch
double time_since_epoch()
foxglove::Server::ClientInfo::subscriptionsByChannel
std::unordered_map< ChannelId, SubscriptionId > subscriptionsByChannel
Definition: websocket_server.hpp:161
foxglove::Server::_nextChannelId
uint32_t _nextChannelId
Definition: websocket_server.hpp:183
foxglove::Server::Server
Server(std::string name, LogCallback logger, const ServerOptions &options)
Definition: websocket_server.hpp:240
foxglove::Server::sendBinary
void sendBinary(ConnHandle hdl, const uint8_t *payload, size_t payloadSize)
Definition: websocket_server.hpp:585
foxglove::Server::_options
ServerOptions _options
Definition: websocket_server.hpp:178
foxglove::Server::_clientChannelsMutex
std::shared_mutex _clientChannelsMutex
Definition: websocket_server.hpp:195
foxglove::ParameterSubscriptionOperation::UNSUBSCRIBE
@ UNSUBSCRIBE
foxglove::OpCode
websocketpp::frame::opcode::value OpCode
Definition: websocket_client.hpp:30
foxglove::ServiceId
uint32_t ServiceId
Definition: common.hpp:29
foxglove::Server::sendJson
void sendJson(ConnHandle hdl, json &&payload)
Definition: websocket_server.hpp:567
foxglove::FetchAssetStatus::Success
@ Success
foxglove::Server::stop
void stop() override
Definition: websocket_server.hpp:447
foxglove::Server::addChannels
std::vector< ChannelId > addChannels(const std::vector< ChannelWithoutId > &channels) override
Definition: websocket_server.hpp:812
foxglove::Server::handleUnsubscribeConnectionGraph
void handleUnsubscribeConnectionGraph(ConnHandle hdl)
Definition: websocket_server.hpp:1450
foxglove::CAPABILITY_CONNECTION_GRAPH
constexpr char CAPABILITY_CONNECTION_GRAPH[]
Definition: common.hpp:18
server_interface.hpp
foxglove::ServiceResponse::read
void read(const uint8_t *data, size_t size)
Definition: serialization.cpp:143
foxglove::Server::_serverThread
std::unique_ptr< std::thread > _serverThread
Definition: websocket_server.hpp:180
foxglove::Service
Definition: common.hpp:121


foxglove_bridge
Author(s): Foxglove
autogenerated on Wed Mar 5 2025 03:34:31