10 #include <shared_mutex>
11 #include <string_view>
13 #include <unordered_map>
14 #include <unordered_set>
17 #include <nlohmann/json.hpp>
18 #include <websocketpp/config/asio.hpp>
19 #include <websocketpp/server.hpp>
31 #define FOXGLOVE_DEBOUNCE(f, ms) \
33 static auto last_call = std::chrono::system_clock::now(); \
34 const auto now = std::chrono::system_clock::now(); \
35 if (std::chrono::duration_cast<std::chrono::milliseconds>(now - last_call).count() > ms) { \
43 constexpr uint32_t StringHash(
const std::string_view str) {
44 uint32_t result = 0x811C9DC5;
46 result = (
static_cast<uint32_t
>(c) ^ result) * 0x01000193;
51 constexpr
auto SUBSCRIBE = StringHash(
"subscribe");
52 constexpr
auto UNSUBSCRIBE = StringHash(
"unsubscribe");
53 constexpr
auto ADVERTISE = StringHash(
"advertise");
54 constexpr
auto UNADVERTISE = StringHash(
"unadvertise");
55 constexpr
auto GET_PARAMETERS = StringHash(
"getParameters");
56 constexpr
auto SET_PARAMETERS = StringHash(
"setParameters");
57 constexpr
auto SUBSCRIBE_PARAMETER_UPDATES = StringHash(
"subscribeParameterUpdates");
58 constexpr
auto UNSUBSCRIBE_PARAMETER_UPDATES = StringHash(
"unsubscribeParameterUpdates");
59 constexpr
auto SUBSCRIBE_CONNECTION_GRAPH = StringHash(
"subscribeConnectionGraph");
60 constexpr
auto UNSUBSCRIBE_CONNECTION_GRAPH = StringHash(
"unsubscribeConnectionGraph");
61 constexpr
auto FETCH_ASSET = StringHash(
"fetchAsset");
69 using OpCode = websocketpp::frame::opcode::value;
71 static const websocketpp::log::level
APP = websocketpp::log::alevel::app;
72 static const websocketpp::log::level
WARNING = websocketpp::log::elevel::warn;
73 static const websocketpp::log::level
RECOVERABLE = websocketpp::log::elevel::rerror;
115 template <
typename ServerConfiguration>
121 using Tcp = websocketpp::lib::asio::ip::tcp;
131 void start(
const std::string& host, uint16_t port)
override;
132 void stop()
override;
134 std::vector<ChannelId>
addChannels(
const std::vector<ChannelWithoutId>& channels)
override;
135 void removeChannels(
const std::vector<ChannelId>& channelIds)
override;
137 const std::optional<std::string>& requestId = std::nullopt)
override;
139 std::vector<ServiceId>
addServices(
const std::vector<ServiceWithoutId>& services)
override;
140 void removeServices(
const std::vector<ServiceId>& serviceIds)
override;
145 const uint8_t* payload,
size_t payloadSize)
override;
149 const std::string& message)
override;
184 std::map<ConnHandle, ClientInfo, std::owner_less<>>
_clients;
186 std::map<ConnHandle, std::unordered_map<ClientChannelId, ClientAdvertisement>, std::owner_less<>>
188 std::map<ConnHandle, std::unordered_set<std::string>, std::owner_less<>>
191 std::unordered_map<ServiceId, ServiceWithoutId>
_services;
220 const std::string& message);
222 const std::unordered_set<std::string>& paramNames);
239 template <
typename ServerConfiguration>
242 : _name(
std::move(name))
244 , _options(options) {
249 websocketpp::lib::error_code ec;
252 throw std::runtime_error(
"Failed to initialize websocket server: " + ec.message());
255 _server.clear_access_channels(websocketpp::log::alevel::all);
272 _server.set_listen_backlog(128);
278 template <
typename ServerConfiguration>
281 template <
typename ServerConfiguration>
283 websocketpp::lib::asio::error_code ec;
284 _server.get_con_from_hdl(hdl)->get_raw_socket().set_option(Tcp::no_delay(
true), ec);
286 _server.get_elog().write(
RECOVERABLE,
"Failed to set TCP_NODELAY: " + ec.message());
290 template <
typename ServerConfiguration>
292 auto con = _server.get_con_from_hdl(hdl);
294 const auto& subprotocols = con->get_requested_subprotocols();
296 subprotocols.end()) {
300 _server.get_alog().write(
APP,
"Rejecting client " + remoteEndpointString(hdl) +
301 " which did not declare support for subprotocol " +
306 template <
typename ServerConfiguration>
308 auto con = _server.get_con_from_hdl(hdl);
309 const auto endpoint = remoteEndpointString(hdl);
310 _server.get_alog().write(
APP,
"Client " + endpoint +
" connected via " + con->get_resource());
313 std::unique_lock<std::shared_mutex> lock(_clientsMutex);
314 _clients.emplace(hdl,
ClientInfo(endpoint, hdl));
318 {
"op",
"serverInfo"},
320 {
"capabilities", _options.capabilities},
321 {
"supportedEncodings", _options.supportedEncodings},
322 {
"metadata", _options.metadata},
323 {
"sessionId", _options.sessionId},
327 std::vector<Channel> channels;
329 std::shared_lock<std::shared_mutex> lock(_channelsMutex);
330 for (
const auto& [
id, channel] : _channels) {
332 channels.push_back(channel);
337 {
"channels", std::move(channels)},
340 std::vector<Service> services;
342 std::shared_lock<std::shared_mutex> lock(_servicesMutex);
343 for (
const auto& [
id, service] : _services) {
344 services.push_back(
Service(service,
id));
348 {
"op",
"advertiseServices"},
349 {
"services", std::move(services)},
353 template <
typename ServerConfiguration>
355 std::unordered_map<ChannelId, SubscriptionId> oldSubscriptionsByChannel;
356 std::unordered_set<ClientChannelId> oldAdvertisedChannels;
357 std::string clientName;
358 bool wasSubscribedToConnectionGraph;
360 std::unique_lock<std::shared_mutex> lock(_clientsMutex);
361 const auto clientIt = _clients.find(hdl);
362 if (clientIt == _clients.end()) {
363 _server.get_elog().write(
RECOVERABLE,
"Client " + remoteEndpointString(hdl) +
364 " disconnected but not found in _clients");
368 const auto& client = clientIt->second;
369 clientName = client.name;
370 _server.get_alog().write(
APP,
"Client " + clientName +
" disconnected");
372 oldSubscriptionsByChannel = std::move(client.subscriptionsByChannel);
373 oldAdvertisedChannels = std::move(client.advertisedChannels);
374 wasSubscribedToConnectionGraph = client.subscribedToConnectionGraph;
375 _clients.erase(clientIt);
379 for (
const auto clientChannelId : oldAdvertisedChannels) {
380 _server.get_alog().write(
APP,
"Client " + clientName +
" unadvertising channel " +
381 std::to_string(clientChannelId) +
" due to disconnect");
382 if (_handlers.clientUnadvertiseHandler) {
384 _handlers.clientUnadvertiseHandler(clientChannelId, hdl);
385 }
catch (
const std::exception& ex) {
386 _server.get_elog().write(
387 RECOVERABLE,
"Exception caught when closing connection: " + std::string(ex.what()));
389 _server.get_elog().write(
RECOVERABLE,
"Exception caught when closing connection");
395 std::unique_lock<std::shared_mutex> lock(_clientChannelsMutex);
396 _clientChannels.erase(hdl);
400 if (_handlers.unsubscribeHandler) {
401 for (
const auto& [chanId, subs] : oldSubscriptionsByChannel) {
404 _handlers.unsubscribeHandler(chanId, hdl);
405 }
catch (
const std::exception& ex) {
406 _server.get_elog().write(
407 RECOVERABLE,
"Exception caught when closing connection: " + std::string(ex.what()));
409 _server.get_elog().write(
RECOVERABLE,
"Exception caught when closing connection");
415 std::unordered_set<std::string> clientSubscribedParameters;
417 std::lock_guard<std::mutex> lock(_clientParamSubscriptionsMutex);
418 clientSubscribedParameters = _clientParamSubscriptions[hdl];
419 _clientParamSubscriptions.erase(hdl);
421 unsubscribeParamsWithoutSubscriptions(hdl, clientSubscribedParameters);
423 if (wasSubscribedToConnectionGraph) {
424 std::unique_lock<std::shared_mutex> lock(_connectionGraphMutex);
425 _connectionGraph.subscriptionCount--;
426 if (_connectionGraph.subscriptionCount == 0 && _handlers.subscribeConnectionGraphHandler) {
427 _server.get_alog().write(
APP,
"Unsubscribing from connection graph updates.");
429 _handlers.subscribeConnectionGraphHandler(
false);
430 }
catch (
const std::exception& ex) {
431 _server.get_elog().write(
432 RECOVERABLE,
"Exception caught when closing connection: " + std::string(ex.what()));
434 _server.get_elog().write(
RECOVERABLE,
"Exception caught when closing connection");
441 template <
typename ServerConfiguration>
443 _handlers = handlers;
446 template <
typename ServerConfiguration>
448 if (_server.stopped()) {
452 _server.get_alog().write(
APP,
"Stopping WebSocket server");
453 websocketpp::lib::error_code ec;
455 _server.stop_perpetual();
457 if (_server.is_listening()) {
458 _server.stop_listening(ec);
460 _server.get_elog().write(
RECOVERABLE,
"Failed to stop listening: " + ec.message());
464 std::vector<std::shared_ptr<ConnectionType>> connections;
466 std::shared_lock<std::shared_mutex> lock(_clientsMutex);
467 connections.reserve(_clients.size());
468 for (
const auto& [hdl, client] : _clients) {
470 if (
auto connection = _server.get_con_from_hdl(hdl, ec)) {
471 connections.push_back(connection);
476 if (!connections.empty()) {
477 _server.get_alog().write(
478 APP,
"Closing " + std::to_string(connections.size()) +
" client connection(s)");
481 for (
const auto& connection : connections) {
482 connection->close(websocketpp::close::status::going_away,
"server shutdown", ec);
484 _server.get_elog().write(
RECOVERABLE,
"Failed to close connection: " + ec.message());
489 constexpr
size_t MAX_SHUTDOWN_MS = 1000;
490 constexpr
size_t SLEEP_MS = 10;
491 size_t durationMs = 0;
492 while (!_server.stopped() && durationMs < MAX_SHUTDOWN_MS) {
493 std::this_thread::sleep_for(std::chrono::milliseconds(SLEEP_MS));
495 durationMs += SLEEP_MS;
498 if (!_server.stopped()) {
499 _server.get_elog().write(
RECOVERABLE,
"Failed to close all connections, forcefully stopping");
500 for (
const auto& hdl : connections) {
501 if (
auto con = _server.get_con_from_hdl(hdl, ec)) {
503 "Terminating connection to " + remoteEndpointString(hdl));
511 _server.get_alog().write(
APP,
"All WebSocket connections closed");
514 _server.get_alog().write(
APP,
"Waiting for WebSocket server run loop to terminate");
515 _serverThread->join();
516 _serverThread.reset();
517 _server.get_alog().write(
APP,
"WebSocket server run loop terminated");
520 std::unique_lock<std::shared_mutex> lock(_clientsMutex);
524 template <
typename ServerConfiguration>
527 throw std::runtime_error(
"Server already started");
530 websocketpp::lib::error_code ec;
532 _server.listen(host, std::to_string(port), ec);
534 throw std::runtime_error(
"Failed to listen on port " + std::to_string(port) +
": " +
538 _server.start_accept(ec);
540 throw std::runtime_error(
"Failed to start accepting connections: " + ec.message());
543 _serverThread = std::make_unique<std::thread>([
this]() {
544 _server.get_alog().write(
APP,
"WebSocket server run loop started");
546 _server.get_alog().write(
APP,
"WebSocket server run loop stopped");
549 if (!_server.is_listening()) {
550 throw std::runtime_error(
"WebSocket server failed to listen on port " + std::to_string(port));
553 websocketpp::lib::asio::error_code asioEc;
554 auto endpoint = _server.get_local_endpoint(asioEc);
556 throw std::runtime_error(
"Failed to resolve the local endpoint: " + asioEc.message());
559 const std::string protocol = _options.useTls ?
"wss" :
"ws";
560 auto address = endpoint.address();
561 _server.get_alog().write(
APP,
"WebSocket server listening at " + protocol +
"://" +
563 std::to_string(endpoint.port()));
566 template <
typename ServerConfiguration>
569 _server.send(hdl, std::move(payload).dump(), OpCode::TEXT);
570 }
catch (std::exception
const& e) {
575 template <
typename ServerConfiguration>
578 _server.send(hdl, payload, OpCode::TEXT);
579 }
catch (std::exception
const& e) {
584 template <
typename ServerConfiguration>
586 size_t payloadSize) {
588 _server.send(hdl, payload, payloadSize, OpCode::BINARY);
589 }
catch (std::exception
const& e) {
594 template <
typename ServerConfiguration>
597 const std::string& message) {
598 const std::string endpoint = remoteEndpointString(clientHandle);
599 const std::string logMessage = endpoint +
": " + message;
601 auto logger = level ==
StatusLevel::Info ? _server.get_alog() : _server.get_elog();
602 logger.write(logLevel, logMessage);
604 sendJson(clientHandle,
json{
606 {
"level",
static_cast<uint8_t
>(level)},
607 {
"message", message},
611 template <
typename ServerConfiguration>
613 const OpCode op = msg->get_opcode();
615 if (op == OpCode::TEXT) {
616 handleTextMessage(hdl, msg);
617 }
else if (op == OpCode::BINARY) {
618 handleBinaryMessage(hdl, msg);
620 }
catch (
const std::exception& e) {
624 "Exception occurred when executing message handler");
628 template <
typename ServerConfiguration>
630 const json payload = json::parse(msg->get_payload());
631 const std::string& op = payload.at(
"op").get<std::string>();
635 !hasCapability(requiredCapabilityIt->second)) {
637 "Operation '" + op +
"' not supported as server capability '" +
638 requiredCapabilityIt->second +
"' is missing");
642 if (!hasHandler(StringHash(op))) {
645 "Operation '" + op +
"' not supported as server handler function is missing");
650 switch (StringHash(op)) {
652 handleSubscribe(payload, hdl);
655 handleUnsubscribe(payload, hdl);
658 handleAdvertise(payload, hdl);
661 handleUnadvertise(payload, hdl);
664 handleGetParameters(payload, hdl);
667 handleSetParameters(payload, hdl);
669 case SUBSCRIBE_PARAMETER_UPDATES:
670 handleSubscribeParameterUpdates(payload, hdl);
672 case UNSUBSCRIBE_PARAMETER_UPDATES:
673 handleUnsubscribeParameterUpdates(payload, hdl);
675 case SUBSCRIBE_CONNECTION_GRAPH:
676 handleSubscribeConnectionGraph(hdl);
678 case UNSUBSCRIBE_CONNECTION_GRAPH:
679 handleUnsubscribeConnectionGraph(hdl);
682 handleFetchAsset(payload, hdl);
685 sendStatusAndLogMsg(hdl,
StatusLevel::Error,
"Unrecognized client opcode \"" + op +
"\"");
689 const std::string postfix =
" (op: " + op +
", id: " + std::to_string(e.
id()) +
")";
691 }
catch (
const std::exception& e) {
692 const std::string postfix =
" (op: " + op +
")";
695 const std::string postfix =
" (op: " + op +
")";
700 template <
typename ServerConfiguration>
702 const auto& payload = msg->get_payload();
703 const uint8_t* data =
reinterpret_cast<const uint8_t*
>(payload.data());
704 const size_t length = payload.size();
715 !hasCapability(requiredCapabilityIt->second)) {
717 "Binary operation '" + std::to_string(
static_cast<int>(op)) +
718 "' not supported as server capability '" + requiredCapabilityIt->second +
725 if (!_handlers.clientMessageHandler) {
731 "Invalid message length " + std::to_string(length));
734 const auto timestamp = std::chrono::duration_cast<std::chrono::nanoseconds>(
738 std::shared_lock<std::shared_mutex> lock(_clientChannelsMutex);
740 auto clientPublicationsIt = _clientChannels.find(hdl);
741 if (clientPublicationsIt == _clientChannels.end()) {
746 auto& clientPublications = clientPublicationsIt->second;
747 const auto& channelIt = clientPublications.find(channelId);
748 if (channelIt == clientPublications.end()) {
750 "Channel " + std::to_string(channelId) +
" is not advertised");
755 const auto& advertisement = channelIt->second;
756 const uint32_t sequence = 0;
757 const ClientMessage clientMessage{
static_cast<uint64_t
>(timestamp),
758 static_cast<uint64_t
>(timestamp),
763 _handlers.clientMessageHandler(clientMessage, hdl);
767 sendStatusAndLogMsg(hdl,
StatusLevel::Error,
"clientMessage: Failed to execute handler");
772 if (length < request.
size()) {
773 const std::string errMessage =
774 "Invalid service call request length " + std::to_string(length);
775 sendServiceFailure(hdl, request.
serviceId, request.
callId, errMessage);
780 request.
read(data + 1, length - 1);
783 std::shared_lock<std::shared_mutex> lock(_servicesMutex);
784 if (_services.find(request.
serviceId) == _services.end()) {
785 const std::string errMessage =
786 "Service " + std::to_string(request.
serviceId) +
" is not advertised";
787 sendServiceFailure(hdl, request.
serviceId, request.
callId, errMessage);
794 if (!_handlers.serviceRequestHandler) {
798 _handlers.serviceRequestHandler(request, hdl);
799 }
catch (
const std::exception& e) {
806 "Unrecognized client opcode " + std::to_string(uint8_t(op)));
811 template <
typename ServerConfiguration>
813 const std::vector<ChannelWithoutId>& channels) {
814 if (channels.empty()) {
818 std::vector<ChannelId> channelIds;
819 channelIds.reserve(channels.size());
820 json::array_t channelsJson;
823 std::unique_lock<std::shared_mutex> lock(_channelsMutex);
824 for (
const auto& channelWithoutId : channels) {
825 const auto newId = ++_nextChannelId;
826 channelIds.push_back(newId);
827 Channel newChannel{newId, channelWithoutId};
828 channelsJson.push_back(newChannel);
829 _channels.emplace(newId, std::move(newChannel));
833 const auto msg =
json{{
"op",
"advertise"}, {
"channels", channelsJson}}.dump();
834 std::shared_lock<std::shared_mutex> clientsLock(_clientsMutex);
835 for (
const auto& [hdl, clientInfo] : _clients) {
837 sendJsonRaw(hdl, msg);
843 template <
typename ServerConfiguration>
845 if (channelIds.empty()) {
850 std::unique_lock<std::shared_mutex> channelsLock(_channelsMutex);
851 for (
auto channelId : channelIds) {
852 _channels.erase(channelId);
856 const auto msg =
json{{
"op",
"unadvertise"}, {
"channelIds", channelIds}}.dump();
858 std::unique_lock<std::shared_mutex> clientsLock(_clientsMutex);
859 for (
auto& [hdl, clientInfo] : _clients) {
860 for (
auto channelId : channelIds) {
861 if (
const auto it = clientInfo.subscriptionsByChannel.find(channelId);
862 it != clientInfo.subscriptionsByChannel.end()) {
863 clientInfo.subscriptionsByChannel.erase(it);
866 sendJsonRaw(hdl, msg);
870 template <
typename ServerConfiguration>
872 ConnHandle hdl,
const std::vector<Parameter>& parameters,
873 const std::optional<std::string>& requestId) {
875 std::vector<Parameter> nonEmptyParameters;
876 std::copy_if(parameters.begin(), parameters.end(), std::back_inserter(nonEmptyParameters),
878 return p.getType() != ParameterType::PARAMETER_NOT_SET;
881 nlohmann::json jsonPayload{{
"op",
"parameterValues"}, {
"parameters", nonEmptyParameters}};
883 jsonPayload[
"id"] = requestId.value();
885 sendJsonRaw(hdl, jsonPayload.dump());
888 template <
typename ServerConfiguration>
890 const std::vector<Parameter>& parameters) {
891 std::lock_guard<std::mutex> lock(_clientParamSubscriptionsMutex);
892 for (
const auto& clientParamSubscriptions : _clientParamSubscriptions) {
893 std::vector<Parameter> paramsToSendToClient;
896 std::copy_if(parameters.begin(), parameters.end(), std::back_inserter(paramsToSendToClient),
898 return clientParamSubscriptions.second.find(param.getName()) !=
899 clientParamSubscriptions.second.end();
902 if (!paramsToSendToClient.empty()) {
903 publishParameterValues(clientParamSubscriptions.first, paramsToSendToClient);
908 template <
typename ServerConfiguration>
910 const std::vector<ServiceWithoutId>& services) {
911 if (services.empty()) {
915 std::unique_lock<std::shared_mutex> lock(_servicesMutex);
916 std::vector<ServiceId> serviceIds;
918 for (
const auto& service : services) {
919 const ServiceId serviceId = ++_nextServiceId;
920 _services.emplace(serviceId, service);
921 serviceIds.push_back(serviceId);
922 newServices.push_back(
Service(service, serviceId));
925 const auto msg =
json{{
"op",
"advertiseServices"}, {
"services", std::move(newServices)}}.dump();
926 std::shared_lock<std::shared_mutex> clientsLock(_clientsMutex);
927 for (
const auto& [hdl, clientInfo] : _clients) {
929 sendJsonRaw(hdl, msg);
935 template <
typename ServerConfiguration>
937 std::unique_lock<std::shared_mutex> lock(_servicesMutex);
938 std::vector<ServiceId> removedServices;
939 for (
const auto& serviceId : serviceIds) {
940 if (
const auto it = _services.find(serviceId); it != _services.end()) {
942 removedServices.push_back(serviceId);
946 if (!removedServices.empty()) {
948 json{{
"op",
"unadvertiseServices"}, {
"serviceIds", std::move(removedServices)}}.dump();
949 std::shared_lock<std::shared_mutex> clientsLock(_clientsMutex);
950 for (
const auto& [hdl, clientInfo] : _clients) {
952 sendJsonRaw(hdl, msg);
957 template <
typename ServerConfiguration>
959 uint64_t timestamp,
const uint8_t* payload,
960 size_t payloadSize) {
961 websocketpp::lib::error_code ec;
962 const auto con = _server.get_con_from_hdl(clientHandle, ec);
967 const auto bufferSizeinBytes = con->get_buffered_amount();
968 if (bufferSizeinBytes + payloadSize >= _options.sendBufferLimitBytes) {
969 const auto logFn = [
this, clientHandle]() {
976 SubscriptionId subId = std::numeric_limits<SubscriptionId>::max();
979 std::shared_lock<std::shared_mutex> lock(_clientsMutex);
980 const auto clientHandleAndInfoIt = _clients.find(clientHandle);
981 if (clientHandleAndInfoIt == _clients.end()) {
985 const auto& client = clientHandleAndInfoIt->second;
986 const auto& subs = client.subscriptionsByChannel.find(chanId);
987 if (subs == client.subscriptionsByChannel.end()) {
990 subId = subs->second;
993 std::array<uint8_t, 1 + 4 + 8> msgHeader;
998 const size_t messageSize = msgHeader.size() + payloadSize;
999 auto message = con->get_message(OpCode::BINARY, messageSize);
1000 message->set_compressed(_options.useCompression);
1002 message->set_payload(msgHeader.data(), msgHeader.size());
1003 message->append_payload(payload, payloadSize);
1007 template <
typename ServerConfiguration>
1009 std::array<uint8_t, 1 + 8> message;
1013 std::shared_lock<std::shared_mutex> lock(_clientsMutex);
1014 for (
const auto& [hdl, clientInfo] : _clients) {
1016 sendBinary(hdl, message.data(), message.size());
1020 template <
typename ServerConfiguration>
1023 std::vector<uint8_t> payload(1 +
response.size());
1025 response.write(payload.data() + 1);
1026 sendBinary(clientHandle, payload.data(), payload.size());
1029 template <
typename ServerConfiguration>
1031 websocketpp::lib::asio::error_code ec;
1032 auto endpoint = _server.get_local_endpoint(ec);
1034 throw std::runtime_error(
"Server not listening on any port. Has it been started before?");
1036 return endpoint.port();
1039 template <
typename ServerConfiguration>
1042 const std::string& message) {
1043 sendJson(clientHandle,
json{{
"op",
"serviceCallFailure"},
1044 {
"serviceId", serviceId},
1046 {
"message", message}});
1049 template <
typename ServerConfiguration>
1053 json::array_t publisherDiff, subscriberDiff, servicesDiff;
1054 std::unordered_set<std::string> topicNames, serviceNames;
1055 std::unordered_set<std::string> knownTopicNames, knownServiceNames;
1057 std::unique_lock<std::shared_mutex> lock(_connectionGraphMutex);
1058 for (
const auto& [name, publisherIds] : publishedTopics) {
1059 const auto it = _connectionGraph.publishedTopics.find(name);
1060 if (it == _connectionGraph.publishedTopics.end() ||
1061 _connectionGraph.publishedTopics[name] != publisherIds) {
1062 publisherDiff.push_back(
nlohmann::json{{
"name", name}, {
"publisherIds", publisherIds}});
1064 topicNames.insert(name);
1066 for (
const auto& [name, subscriberIds] : subscribedTopics) {
1067 const auto it = _connectionGraph.subscribedTopics.find(name);
1068 if (it == _connectionGraph.subscribedTopics.end() ||
1069 _connectionGraph.subscribedTopics[name] != subscriberIds) {
1070 subscriberDiff.push_back(
nlohmann::json{{
"name", name}, {
"subscriberIds", subscriberIds}});
1072 topicNames.insert(name);
1074 for (
const auto& [name, providerIds] : advertisedServices) {
1075 const auto it = _connectionGraph.advertisedServices.find(name);
1076 if (it == _connectionGraph.advertisedServices.end() ||
1077 _connectionGraph.advertisedServices[name] != providerIds) {
1078 servicesDiff.push_back(
nlohmann::json{{
"name", name}, {
"providerIds", providerIds}});
1080 serviceNames.insert(name);
1083 for (
const auto& nameWithIds : _connectionGraph.publishedTopics) {
1084 knownTopicNames.insert(nameWithIds.first);
1086 for (
const auto& nameWithIds : _connectionGraph.subscribedTopics) {
1087 knownTopicNames.insert(nameWithIds.first);
1089 for (
const auto& nameWithIds : _connectionGraph.advertisedServices) {
1090 knownServiceNames.insert(nameWithIds.first);
1093 _connectionGraph.publishedTopics = publishedTopics;
1094 _connectionGraph.subscribedTopics = subscribedTopics;
1095 _connectionGraph.advertisedServices = advertisedServices;
1098 std::vector<std::string> removedTopics, removedServices;
1099 std::copy_if(knownTopicNames.begin(), knownTopicNames.end(), std::back_inserter(removedTopics),
1100 [&topicNames](
const std::string& topic) {
1101 return topicNames.find(topic) == topicNames.end();
1103 std::copy_if(knownServiceNames.begin(), knownServiceNames.end(),
1104 std::back_inserter(removedServices), [&serviceNames](
const std::string& service) {
1105 return serviceNames.find(service) == serviceNames.end();
1108 if (publisherDiff.empty() && subscriberDiff.empty() && servicesDiff.empty() &&
1109 removedTopics.empty() && removedServices.empty()) {
1114 {
"op",
"connectionGraphUpdate"}, {
"publishedTopics", publisherDiff},
1115 {
"subscribedTopics", subscriberDiff}, {
"advertisedServices", servicesDiff},
1116 {
"removedTopics", removedTopics}, {
"removedServices", removedServices},
1118 const auto payload = msg.dump();
1120 std::shared_lock<std::shared_mutex> clientsLock(_clientsMutex);
1121 for (
const auto& [hdl, clientInfo] : _clients) {
1122 if (clientInfo.subscribedToConnectionGraph) {
1123 _server.send(hdl, payload, OpCode::TEXT);
1128 template <
typename ServerConfiguration>
1130 websocketpp::lib::error_code ec;
1131 const auto con = _server.get_con_from_hdl(clientHandle, ec);
1132 return con ? con->get_remote_endpoint() :
"(unknown)";
1135 template <
typename ServerConfiguration>
1137 return std::find_if(_clientParamSubscriptions.begin(), _clientParamSubscriptions.end(),
1138 [paramName](
const auto& paramSubscriptions) {
1139 return paramSubscriptions.second.find(paramName) !=
1140 paramSubscriptions.second.end();
1141 }) != _clientParamSubscriptions.end();
1144 template <
typename ServerConfiguration>
1146 ConnHandle hdl,
const std::unordered_set<std::string>& paramNames) {
1147 std::vector<std::string> paramsToUnsubscribe;
1149 std::lock_guard<std::mutex> lock(_clientParamSubscriptionsMutex);
1150 std::copy_if(paramNames.begin(), paramNames.end(), std::back_inserter(paramsToUnsubscribe),
1151 [
this](
const std::string& paramName) {
1152 return !isParameterSubscribed(paramName);
1156 if (_handlers.parameterSubscriptionHandler && !paramsToUnsubscribe.empty()) {
1157 for (
const auto&
param : paramsToUnsubscribe) {
1158 _server.get_alog().write(
APP,
"Unsubscribing from parameter '" +
param +
"'.");
1162 _handlers.parameterSubscriptionHandler(paramsToUnsubscribe,
1164 }
catch (
const std::exception& e) {
1168 "Failed to unsubscribe from one more more parameters");
1173 template <
typename ServerConfiguration>
1175 return std::find(_options.capabilities.begin(), _options.capabilities.end(), capability) !=
1176 _options.capabilities.end();
1179 template <
typename ServerConfiguration>
1183 return bool(_handlers.subscribeHandler);
1185 return bool(_handlers.unsubscribeHandler);
1187 return bool(_handlers.clientAdvertiseHandler);
1189 return bool(_handlers.clientUnadvertiseHandler);
1190 case GET_PARAMETERS:
1191 return bool(_handlers.parameterRequestHandler);
1192 case SET_PARAMETERS:
1193 return bool(_handlers.parameterChangeHandler);
1194 case SUBSCRIBE_PARAMETER_UPDATES:
1195 case UNSUBSCRIBE_PARAMETER_UPDATES:
1196 return bool(_handlers.parameterSubscriptionHandler);
1197 case SUBSCRIBE_CONNECTION_GRAPH:
1198 case UNSUBSCRIBE_CONNECTION_GRAPH:
1199 return bool(_handlers.subscribeConnectionGraphHandler);
1201 return bool(_handlers.fetchAssetHandler);
1203 throw std::runtime_error(
"Unknown operation: " + std::to_string(op));
1207 template <
typename ServerConfiguration>
1209 std::unordered_map<ChannelId, SubscriptionId> clientSubscriptionsByChannel;
1211 std::shared_lock<std::shared_mutex> clientsLock(_clientsMutex);
1212 clientSubscriptionsByChannel = _clients.at(hdl).subscriptionsByChannel;
1215 const auto findSubscriptionBySubId =
1216 [](
const std::unordered_map<ChannelId, SubscriptionId>& subscriptionsByChannel,
1218 return std::find_if(subscriptionsByChannel.begin(), subscriptionsByChannel.end(),
1219 [&subId](
const auto& mo) {
1220 return mo.second == subId;
1224 for (
const auto& sub : payload.at(
"subscriptions")) {
1226 ChannelId channelId = sub.at(
"channelId");
1227 if (findSubscriptionBySubId(clientSubscriptionsByChannel, subId) !=
1228 clientSubscriptionsByChannel.end()) {
1230 "Client subscription id " + std::to_string(subId) +
1231 " was already used; ignoring subscription");
1234 const auto& channelIt = _channels.find(channelId);
1235 if (channelIt == _channels.end()) {
1236 sendStatusAndLogMsg(
1238 "Channel " + std::to_string(channelId) +
" is not available; ignoring subscription");
1242 _handlers.subscribeHandler(channelId, hdl);
1243 std::unique_lock<std::shared_mutex> clientsLock(_clientsMutex);
1244 _clients.at(hdl).subscriptionsByChannel.emplace(channelId, subId);
1248 template <
typename ServerConfiguration>
1250 std::unordered_map<ChannelId, SubscriptionId> clientSubscriptionsByChannel;
1252 std::shared_lock<std::shared_mutex> clientsLock(_clientsMutex);
1253 clientSubscriptionsByChannel = _clients.at(hdl).subscriptionsByChannel;
1256 const auto findSubscriptionBySubId =
1257 [](
const std::unordered_map<ChannelId, SubscriptionId>& subscriptionsByChannel,
1259 return std::find_if(subscriptionsByChannel.begin(), subscriptionsByChannel.end(),
1260 [&subId](
const auto& mo) {
1261 return mo.second == subId;
1265 for (
const auto& subIdJson : payload.at(
"subscriptionIds")) {
1267 const auto& sub = findSubscriptionBySubId(clientSubscriptionsByChannel, subId);
1268 if (sub == clientSubscriptionsByChannel.end()) {
1270 "Client subscription id " + std::to_string(subId) +
1271 " did not exist; ignoring unsubscription");
1276 _handlers.unsubscribeHandler(chanId, hdl);
1277 std::unique_lock<std::shared_mutex> clientsLock(_clientsMutex);
1278 _clients.at(hdl).subscriptionsByChannel.erase(chanId);
1282 template <
typename ServerConfiguration>
1284 std::unique_lock<std::shared_mutex> clientChannelsLock(_clientChannelsMutex);
1285 auto [clientPublicationsIt, isFirstPublication] =
1286 _clientChannels.emplace(hdl, std::unordered_map<ClientChannelId, ClientAdvertisement>());
1288 auto& clientPublications = clientPublicationsIt->second;
1290 for (
const auto& chan : payload.at(
"channels")) {
1292 if (!isFirstPublication && clientPublications.find(channelId) != clientPublications.end()) {
1294 "Channel " + std::to_string(channelId) +
" was already advertised");
1298 const auto topic = chan.at(
"topic").get<std::string>();
1299 if (!
isWhitelisted(topic, _options.clientTopicWhitelistPatterns)) {
1301 "Can't advertise channel " + std::to_string(channelId) +
", topic '" +
1302 topic +
"' not whitelisted");
1307 advertisement.topic = topic;
1308 advertisement.encoding = chan.at(
"encoding").get<std::string>();
1309 advertisement.schemaName = chan.at(
"schemaName").get<std::string>();
1311 _handlers.clientAdvertiseHandler(advertisement, hdl);
1312 std::unique_lock<std::shared_mutex> clientsLock(_clientsMutex);
1313 _clients.at(hdl).advertisedChannels.emplace(channelId);
1314 clientPublications.emplace(channelId, advertisement);
1318 template <
typename ServerConfiguration>
1320 std::unique_lock<std::shared_mutex> clientChannelsLock(_clientChannelsMutex);
1321 auto clientPublicationsIt = _clientChannels.find(hdl);
1322 if (clientPublicationsIt == _clientChannels.end()) {
1327 auto& clientPublications = clientPublicationsIt->second;
1329 for (
const auto& chanIdJson : payload.at(
"channelIds")) {
1331 const auto& channelIt = clientPublications.find(channelId);
1332 if (channelIt == clientPublications.end()) {
1336 _handlers.clientUnadvertiseHandler(channelId, hdl);
1337 std::unique_lock<std::shared_mutex> clientsLock(_clientsMutex);
1338 auto& clientInfo = _clients.at(hdl);
1339 clientPublications.erase(channelIt);
1340 const auto advertisedChannelIt = clientInfo.advertisedChannels.find(channelId);
1341 if (advertisedChannelIt != clientInfo.advertisedChannels.end()) {
1342 clientInfo.advertisedChannels.erase(advertisedChannelIt);
1347 template <
typename ServerConfiguration>
1350 const auto paramNames = payload.at(
"parameterNames").get<std::vector<std::string>>();
1351 const auto requestId = payload.find(
"id") == payload.end()
1353 : std::optional<std::string>(payload[
"id"].get<std::string>());
1354 _handlers.parameterRequestHandler(paramNames, requestId, hdl);
1357 template <
typename ServerConfiguration>
1360 const auto parameters = payload.at(
"parameters").get<std::vector<Parameter>>();
1361 const auto requestId = payload.find(
"id") == payload.end()
1363 : std::optional<std::string>(payload[
"id"].get<std::string>());
1364 _handlers.parameterChangeHandler(parameters, requestId, hdl);
1367 template <
typename ServerConfiguration>
1370 const auto paramNames = payload.at(
"parameterNames").get<std::unordered_set<std::string>>();
1371 std::vector<std::string> paramsToSubscribe;
1374 std::lock_guard<std::mutex> lock(_clientParamSubscriptionsMutex);
1375 std::copy_if(paramNames.begin(), paramNames.end(), std::back_inserter(paramsToSubscribe),
1376 [
this](
const std::string& paramName) {
1377 return !isParameterSubscribed(paramName);
1381 auto& clientSubscribedParams = _clientParamSubscriptions[hdl];
1382 clientSubscribedParams.insert(paramNames.begin(), paramNames.end());
1385 if (!paramsToSubscribe.empty()) {
1386 _handlers.parameterSubscriptionHandler(paramsToSubscribe,
1391 template <
typename ServerConfiguration>
1394 const auto paramNames = payload.at(
"parameterNames").get<std::unordered_set<std::string>>();
1396 std::lock_guard<std::mutex> lock(_clientParamSubscriptionsMutex);
1397 auto& clientSubscribedParams = _clientParamSubscriptions[hdl];
1398 for (
const auto& paramName : paramNames) {
1399 clientSubscribedParams.erase(paramName);
1403 unsubscribeParamsWithoutSubscriptions(hdl, paramNames);
1406 template <
typename ServerConfiguration>
1408 bool subscribeToConnnectionGraph =
false;
1410 std::unique_lock<std::shared_mutex> lock(_connectionGraphMutex);
1411 _connectionGraph.subscriptionCount++;
1412 subscribeToConnnectionGraph = _connectionGraph.subscriptionCount == 1;
1415 if (subscribeToConnnectionGraph) {
1417 _server.get_alog().write(
APP,
"Subscribing to connection graph updates.");
1418 _handlers.subscribeConnectionGraphHandler(
true);
1419 std::unique_lock<std::shared_mutex> clientsLock(_clientsMutex);
1420 _clients.at(hdl).subscribedToConnectionGraph =
true;
1423 json::array_t publishedTopicsJson, subscribedTopicsJson, advertisedServicesJson;
1425 std::shared_lock<std::shared_mutex> lock(_connectionGraphMutex);
1426 for (
const auto& [name, ids] : _connectionGraph.publishedTopics) {
1427 publishedTopicsJson.push_back(
nlohmann::json{{
"name", name}, {
"publisherIds", ids}});
1429 for (
const auto& [name, ids] : _connectionGraph.subscribedTopics) {
1430 subscribedTopicsJson.push_back(
nlohmann::json{{
"name", name}, {
"subscriberIds", ids}});
1432 for (
const auto& [name, ids] : _connectionGraph.advertisedServices) {
1433 advertisedServicesJson.push_back(
nlohmann::json{{
"name", name}, {
"providerIds", ids}});
1437 const json jsonMsg = {
1438 {
"op",
"connectionGraphUpdate"},
1439 {
"publishedTopics", publishedTopicsJson},
1440 {
"subscribedTopics", subscribedTopicsJson},
1441 {
"advertisedServices", advertisedServicesJson},
1442 {
"removedTopics", json::array()},
1443 {
"removedServices", json::array()},
1446 sendJsonRaw(hdl, jsonMsg.dump());
1449 template <
typename ServerConfiguration>
1451 bool clientWasSubscribed =
false;
1453 std::unique_lock<std::shared_mutex> clientsLock(_clientsMutex);
1454 auto& clientInfo = _clients.at(hdl);
1455 if (clientInfo.subscribedToConnectionGraph) {
1456 clientWasSubscribed =
true;
1457 clientInfo.subscribedToConnectionGraph =
false;
1461 if (clientWasSubscribed) {
1462 bool unsubscribeFromConnnectionGraph =
false;
1464 std::unique_lock<std::shared_mutex> lock(_connectionGraphMutex);
1465 _connectionGraph.subscriptionCount--;
1466 unsubscribeFromConnnectionGraph = _connectionGraph.subscriptionCount == 0;
1468 if (unsubscribeFromConnnectionGraph) {
1469 _server.get_alog().write(
APP,
"Unsubscribing from connection graph updates.");
1470 _handlers.subscribeConnectionGraphHandler(
false);
1474 "Client was not subscribed to connection graph updates");
1478 template <
typename ServerConfiguration>
1480 const auto uri = payload.at(
"uri").get<std::string>();
1481 const auto requestId = payload.at(
"requestId").get<uint32_t>();
1482 _handlers.fetchAssetHandler(uri, requestId, hdl);
1485 template <
typename ServerConfiguration>
1488 websocketpp::lib::error_code ec;
1489 const auto con = _server.get_con_from_hdl(clientHandle, ec);
1494 const size_t errMsgSize =
1497 const size_t messageSize = 1 + 4 + 1 + 4 + errMsgSize + dataSize;
1499 auto message = con->get_message(OpCode::BINARY, messageSize);
1502 message->append_payload(&op, 1);
1504 std::array<uint8_t, 4> uint32Data;
1506 message->append_payload(uint32Data.data(), uint32Data.size());
1508 const uint8_t status =
static_cast<uint8_t
>(
response.status);
1509 message->append_payload(&status, 1);
1512 message->append_payload(uint32Data.data(), uint32Data.size());
1513 message->append_payload(
response.errorMessage.data(), errMsgSize);
1515 message->append_payload(
response.data.data(), dataSize);