10 #include <shared_mutex> 11 #include <string_view> 13 #include <unordered_map> 14 #include <unordered_set> 17 #include <nlohmann/json.hpp> 18 #include <websocketpp/config/asio.hpp> 19 #include <websocketpp/server.hpp> 31 #define FOXGLOVE_DEBOUNCE(f, ms) \ 33 static auto last_call = std::chrono::system_clock::now(); \ 34 const auto now = std::chrono::system_clock::now(); \ 35 if (std::chrono::duration_cast<std::chrono::milliseconds>(now - last_call).count() > ms) { \ 46 using OpCode = websocketpp::frame::opcode::value;
48 static const websocketpp::log::level
APP = websocketpp::log::alevel::app;
49 static const websocketpp::log::level
WARNING = websocketpp::log::elevel::warn;
50 static const websocketpp::log::level
RECOVERABLE = websocketpp::log::elevel::rerror;
52 constexpr uint32_t
Integer(
const std::string_view str) {
53 uint32_t result = 0x811C9DC5;
55 result = (
static_cast<uint32_t
>(c) ^ result) * 0x01000193;
99 template <
typename ServerConfiguration>
105 using Tcp = websocketpp::lib::asio::ip::tcp;
115 void start(
const std::string& host, uint16_t port)
override;
116 void stop()
override;
118 std::vector<ChannelId> addChannels(
const std::vector<ChannelWithoutId>& channels)
override;
119 void removeChannels(
const std::vector<ChannelId>& channelIds)
override;
120 void publishParameterValues(
ConnHandle clientHandle,
const std::vector<Parameter>& parameters,
121 const std::optional<std::string>& requestId = std::nullopt)
override;
122 void updateParameterValues(
const std::vector<Parameter>& parameters)
override;
123 std::vector<ServiceId> addServices(
const std::vector<ServiceWithoutId>& services)
override;
124 void removeServices(
const std::vector<ServiceId>& serviceIds)
override;
129 const uint8_t* payload,
size_t payloadSize)
override;
130 void broadcastTime(uint64_t timestamp)
override;
132 void updateConnectionGraph(
const MapOfSets& publishedTopics,
const MapOfSets& subscribedTopics,
133 const MapOfSets& advertisedServices)
override;
136 std::string remoteEndpointString(
ConnHandle clientHandle)
override;
144 bool subscribedToConnectionGraph =
false;
164 uint32_t _nextChannelId = 0;
165 std::map<ConnHandle, ClientInfo, std::owner_less<>>
_clients;
167 std::map<ConnHandle, std::unordered_map<ClientChannelId, ClientAdvertisement>, std::owner_less<>>
169 std::map<ConnHandle, std::unordered_set<std::string>, std::owner_less<>>
172 std::unordered_map<ServiceId, ServiceWithoutId>
_services;
181 int subscriptionCount = 0;
188 void setupTlsHandler();
198 void sendJsonRaw(
ConnHandle hdl,
const std::string& payload);
199 void sendBinary(
ConnHandle hdl,
const uint8_t* payload,
size_t payloadSize);
201 const std::string& message);
202 void unsubscribeParamsWithoutSubscriptions(
ConnHandle hdl,
203 const std::unordered_set<std::string>& paramNames);
204 bool isParameterSubscribed(
const std::string& paramName)
const;
205 bool hasCapability(
const std::string& capability)
const;
208 template <
typename ServerConfiguration>
211 : _name(
std::move(name))
213 , _options(options) {
221 throw std::runtime_error(
"Failed to initialize websocket server: " + ec.message());
224 _server.clear_access_channels(websocketpp::log::alevel::all);
225 _server.set_access_channels(APP);
235 _server.set_listen_backlog(128);
241 template <
typename ServerConfiguration>
244 template <
typename ServerConfiguration>
247 _server.get_con_from_hdl(hdl)->get_raw_socket().set_option(Tcp::no_delay(
true), ec);
249 _server.get_elog().write(RECOVERABLE,
"Failed to set TCP_NODELAY: " + ec.message());
253 template <
typename ServerConfiguration>
255 auto con =
_server.get_con_from_hdl(hdl);
257 const auto& subprotocols = con->get_requested_subprotocols();
259 subprotocols.end()) {
264 " which did not declare support for subprotocol " +
269 template <
typename ServerConfiguration>
271 auto con =
_server.get_con_from_hdl(hdl);
273 _server.get_alog().write(APP,
"Client " + endpoint +
" connected via " + con->get_resource());
281 {
"op",
"serverInfo"},
290 std::vector<Channel> channels;
293 for (
const auto& [
id, channel] :
_channels) {
295 channels.push_back(channel);
300 {
"channels", std::move(channels)},
303 std::vector<Service> services;
306 for (
const auto& [
id, service] :
_services) {
307 services.push_back(
Service(service,
id));
311 {
"op",
"advertiseServices"},
312 {
"services", std::move(services)},
316 template <
typename ServerConfiguration>
318 std::unordered_map<ChannelId, SubscriptionId> oldSubscriptionsByChannel;
319 std::unordered_set<ClientChannelId> oldAdvertisedChannels;
320 std::string clientName;
321 bool wasSubscribedToConnectionGraph;
324 const auto clientIt =
_clients.find(hdl);
327 " disconnected but not found in _clients");
331 const auto& client = clientIt->second;
332 clientName = client.name;
333 _server.get_alog().write(APP,
"Client " + clientName +
" disconnected");
335 oldSubscriptionsByChannel = std::move(client.subscriptionsByChannel);
336 oldAdvertisedChannels = std::move(client.advertisedChannels);
337 wasSubscribedToConnectionGraph = client.subscribedToConnectionGraph;
342 for (
const auto clientChannelId : oldAdvertisedChannels) {
343 _server.get_alog().write(APP,
"Client " + clientName +
" unadvertising channel " +
344 std::to_string(clientChannelId) +
" due to disconnect");
357 for (
const auto& [chanId, subs] : oldSubscriptionsByChannel) {
364 std::unordered_set<std::string> clientSubscribedParameters;
372 if (wasSubscribedToConnectionGraph) {
376 _server.get_alog().write(APP,
"Unsubscribing from connection graph updates.");
383 template <
typename ServerConfiguration>
388 template <
typename ServerConfiguration>
394 _server.get_alog().write(APP,
"Stopping WebSocket server");
402 _server.get_elog().write(RECOVERABLE,
"Failed to stop listening: " + ec.message());
406 std::vector<std::shared_ptr<ConnectionType>> connections;
409 connections.reserve(
_clients.size());
410 for (
const auto& [hdl, client] :
_clients) {
412 if (
auto connection =
_server.get_con_from_hdl(hdl, ec)) {
413 connections.push_back(connection);
418 if (!connections.empty()) {
420 APP,
"Closing " + std::to_string(connections.size()) +
" client connection(s)");
423 for (
const auto& connection : connections) {
424 connection->close(websocketpp::close::status::going_away,
"server shutdown", ec);
426 _server.get_elog().write(RECOVERABLE,
"Failed to close connection: " + ec.message());
431 constexpr
size_t MAX_SHUTDOWN_MS = 1000;
432 constexpr
size_t SLEEP_MS = 10;
433 size_t durationMs = 0;
434 while (!
_server.stopped() && durationMs < MAX_SHUTDOWN_MS) {
435 std::this_thread::sleep_for(std::chrono::milliseconds(SLEEP_MS));
437 durationMs += SLEEP_MS;
441 _server.get_elog().write(RECOVERABLE,
"Failed to close all connections, forcefully stopping");
442 for (
const auto& hdl : connections) {
443 if (
auto con =
_server.get_con_from_hdl(hdl, ec)) {
444 _server.get_elog().write(RECOVERABLE,
453 _server.get_alog().write(APP,
"All WebSocket connections closed");
456 _server.get_alog().write(APP,
"Waiting for WebSocket server run loop to terminate");
459 _server.get_alog().write(APP,
"WebSocket server run loop terminated");
466 template <
typename ServerConfiguration>
469 throw std::runtime_error(
"Server already started");
474 _server.listen(host, std::to_string(port), ec);
476 throw std::runtime_error(
"Failed to listen on port " + std::to_string(port) +
": " +
482 throw std::runtime_error(
"Failed to start accepting connections: " + ec.message());
486 _server.get_alog().write(APP,
"WebSocket server run loop started");
488 _server.get_alog().write(APP,
"WebSocket server run loop stopped");
492 throw std::runtime_error(
"WebSocket server failed to listen on port " + std::to_string(port));
495 auto endpoint =
_server.get_local_endpoint(ec);
497 throw std::runtime_error(
"Failed to resolve the local endpoint: " + ec.message());
501 auto address = endpoint.address();
502 _server.get_alog().write(APP,
"WebSocket server listening at " + protocol +
"://" +
504 std::to_string(endpoint.port()));
507 template <
typename ServerConfiguration>
510 _server.send(hdl, std::move(payload).dump(), OpCode::TEXT);
511 }
catch (std::exception
const& e) {
512 _server.get_elog().write(RECOVERABLE, e.what());
516 template <
typename ServerConfiguration>
519 _server.send(hdl, payload, OpCode::TEXT);
520 }
catch (std::exception
const& e) {
521 _server.get_elog().write(RECOVERABLE, e.what());
525 template <
typename ServerConfiguration>
527 size_t payloadSize) {
529 _server.send(hdl, payload, payloadSize, OpCode::BINARY);
530 }
catch (std::exception
const& e) {
531 _server.get_elog().write(RECOVERABLE, e.what());
535 template <
typename ServerConfiguration>
538 const std::string& message) {
540 const std::string logMessage = endpoint +
": " + message;
543 logger.write(logLevel, logMessage);
547 {
"level",
static_cast<uint8_t
>(level)},
548 {
"message", message},
552 template <
typename ServerConfiguration>
554 const OpCode op = msg->get_opcode();
562 }
catch (
const std::exception& e) {
566 "Exception occurred when executing text message handler");
570 case OpCode::BINARY: {
574 }
catch (
const std::exception& e) {
578 "Exception occurred when executing binary message handler");
585 }
catch (std::exception
const& ex) {
587 std::string{
"Error parsing message: "} + ex.what());
591 template <
typename ServerConfiguration>
593 const json payload = json::parse(msg->get_payload());
594 const std::string& op = payload.at(
"op").get<std::string>();
596 const auto requiredCapabilityIt = CAPABILITY_BY_CLIENT_OPERATION.find(op);
597 if (requiredCapabilityIt != CAPABILITY_BY_CLIENT_OPERATION.end() &&
600 "Operation '" + op +
"' not supported as server capability '" +
601 requiredCapabilityIt->second +
"' is missing");
605 std::shared_lock<std::shared_mutex> clientsLock(
_clientsMutex);
606 auto& clientInfo =
_clients.at(hdl);
608 const auto findSubscriptionBySubId = [&clientInfo](
SubscriptionId subId) {
609 return std::find_if(clientInfo.subscriptionsByChannel.begin(),
610 clientInfo.subscriptionsByChannel.end(), [&subId](
const auto& mo) {
611 return mo.second == subId;
617 constexpr
auto ADVERTISE =
Integer(
"advertise");
618 constexpr
auto UNADVERTISE =
Integer(
"unadvertise");
619 constexpr
auto GET_PARAMETERS =
Integer(
"getParameters");
620 constexpr
auto SET_PARAMETERS =
Integer(
"setParameters");
621 constexpr
auto SUBSCRIBE_PARAMETER_UPDATES =
Integer(
"subscribeParameterUpdates");
622 constexpr
auto UNSUBSCRIBE_PARAMETER_UPDATES =
Integer(
"unsubscribeParameterUpdates");
623 constexpr
auto SUBSCRIBE_CONNECTION_GRAPH =
Integer(
"subscribeConnectionGraph");
624 constexpr
auto UNSUBSCRIBE_CONNECTION_GRAPH =
Integer(
"unsubscribeConnectionGraph");
631 for (
const auto& sub : payload.at(
"subscriptions")) {
633 ChannelId channelId = sub.at(
"channelId");
634 if (findSubscriptionBySubId(subId) != clientInfo.subscriptionsByChannel.end()) {
636 "Client subscription id " + std::to_string(subId) +
637 " was already used; ignoring subscription");
640 const auto& channelIt =
_channels.find(channelId);
644 "Channel " + std::to_string(channelId) +
" is not available; ignoring subscription");
650 clientInfo.subscriptionsByChannel.emplace(channelId, subId);
662 for (
const auto& subIdJson : payload.at(
"subscriptionIds")) {
664 const auto& sub = findSubscriptionBySubId(subId);
665 if (sub == clientInfo.subscriptionsByChannel.end()) {
667 "Client subscription id " + std::to_string(subId) +
668 " did not exist; ignoring unsubscription");
675 clientInfo.subscriptionsByChannel.erase(sub);
688 auto [clientPublicationsIt, isFirstPublication] =
689 _clientChannels.emplace(hdl, std::unordered_map<ClientChannelId, ClientAdvertisement>());
691 auto& clientPublications = clientPublicationsIt->second;
693 for (
const auto& chan : payload.at(
"channels")) {
695 if (!isFirstPublication && clientPublications.find(channelId) != clientPublications.end()) {
697 "Channel " + std::to_string(channelId) +
" was already advertised");
701 const auto topic = chan.at(
"topic").get<std::string>();
704 "Can't advertise channel " + std::to_string(channelId) +
", topic '" +
705 topic +
"' not whitelisted");
710 advertisement.topic = topic;
711 advertisement.encoding = chan.at(
"encoding").get<std::string>();
712 advertisement.schemaName = chan.at(
"schemaName").get<std::string>();
716 clientPublications.emplace(channelId, advertisement);
717 clientInfo.advertisedChannels.emplace(channelId);
736 auto& clientPublications = clientPublicationsIt->second;
738 for (
const auto& chanIdJson : payload.at(
"channelIds")) {
740 const auto& channelIt = clientPublications.find(channelId);
741 if (channelIt == clientPublications.end()) {
747 clientPublications.erase(channelIt);
748 const auto advertisedChannelIt = clientInfo.advertisedChannels.find(channelId);
749 if (advertisedChannelIt != clientInfo.advertisedChannels.end()) {
750 clientInfo.advertisedChannels.erase(advertisedChannelIt);
759 case GET_PARAMETERS: {
764 const auto paramNames = payload.at(
"parameterNames").get<std::vector<std::string>>();
765 const auto requestId = payload.find(
"id") == payload.end()
767 : std::optional<std::string>(payload[
"id"].get<std::string>());
771 }
catch (
const std::exception& e) {
777 case SET_PARAMETERS: {
782 const auto parameters = payload.at(
"parameters").get<std::vector<Parameter>>();
783 const auto requestId = payload.find(
"id") == payload.end()
785 : std::optional<std::string>(payload[
"id"].get<std::string>());
788 }
catch (
const std::exception& e) {
794 case SUBSCRIBE_PARAMETER_UPDATES: {
799 const auto paramNames = payload.at(
"parameterNames").get<std::unordered_set<std::string>>();
800 std::vector<std::string> paramsToSubscribe;
804 std::copy_if(paramNames.begin(), paramNames.end(), std::back_inserter(paramsToSubscribe),
805 [
this](
const std::string& paramName) {
811 clientSubscribedParams.insert(paramNames.begin(), paramNames.end());
814 if (paramsToSubscribe.empty()) {
821 }
catch (
const std::exception& e) {
827 case UNSUBSCRIBE_PARAMETER_UPDATES: {
832 const auto paramNames = payload.at(
"parameterNames").get<std::unordered_set<std::string>>();
836 for (
const auto& paramName : paramNames) {
837 clientSubscribedParams.erase(paramName);
843 case SUBSCRIBE_CONNECTION_GRAPH: {
848 bool subscribeToConnnectionGraph =
false;
855 if (subscribeToConnnectionGraph) {
857 _server.get_alog().write(APP,
"Subscribing to connection graph updates.");
859 clientInfo.subscribedToConnectionGraph =
true;
862 json::array_t publishedTopicsJson, subscribedTopicsJson, advertisedServicesJson;
866 publishedTopicsJson.push_back(
nlohmann::json{{
"name", name}, {
"publisherIds", ids}});
869 subscribedTopicsJson.push_back(
nlohmann::json{{
"name", name}, {
"subscriberIds", ids}});
872 advertisedServicesJson.push_back(
nlohmann::json{{
"name", name}, {
"providerIds", ids}});
876 const json jsonMsg = {
877 {
"op",
"connectionGraphUpdate"},
878 {
"publishedTopics", publishedTopicsJson},
879 {
"subscribedTopics", subscribedTopicsJson},
880 {
"advertisedServices", advertisedServicesJson},
881 {
"removedTopics", json::array()},
882 {
"removedServices", json::array()},
887 case UNSUBSCRIBE_CONNECTION_GRAPH: {
892 if (clientInfo.subscribedToConnectionGraph) {
893 clientInfo.subscribedToConnectionGraph =
false;
894 bool unsubscribeFromConnnectionGraph =
false;
900 if (unsubscribeFromConnnectionGraph) {
901 _server.get_alog().write(APP,
"Unsubscribing from connection graph updates.");
906 "Client was not subscribed to connection graph updates");
915 template <
typename ServerConfiguration>
917 const auto& payload = msg->get_payload();
918 const uint8_t* data =
reinterpret_cast<const uint8_t*
>(payload.data());
919 const size_t length = payload.size();
928 const auto requiredCapabilityIt = CAPABILITY_BY_CLIENT_BINARY_OPERATION.find(op);
929 if (requiredCapabilityIt != CAPABILITY_BY_CLIENT_BINARY_OPERATION.end() &&
932 "Binary operation '" + std::to_string(static_cast<int>(op)) +
933 "' not supported as server capability '" + requiredCapabilityIt->second +
946 "Invalid message length " + std::to_string(length));
949 const auto timestamp = std::chrono::duration_cast<std::chrono::nanoseconds>(
950 std::chrono::high_resolution_clock::now().time_since_epoch())
961 auto& clientPublications = clientPublicationsIt->second;
962 const auto& channelIt = clientPublications.find(channelId);
963 if (channelIt == clientPublications.end()) {
965 "Channel " + std::to_string(channelId) +
" is not advertised");
970 const auto& advertisement = channelIt->second;
971 const uint32_t sequence = 0;
972 const ClientMessage clientMessage{
static_cast<uint64_t
>(timestamp),
973 static_cast<uint64_t>(timestamp),
987 if (length < request.
size()) {
989 "Invalid service call request length " + std::to_string(length));
993 request.
read(data + 1, length - 1);
1000 "Service " + std::to_string(request.
serviceId) +
" is not advertised");
1011 "Unrecognized client opcode " + std::to_string(uint8_t(op)));
1016 template <
typename ServerConfiguration>
1018 const std::vector<ChannelWithoutId>& channels) {
1019 if (channels.empty()) {
1023 std::vector<ChannelId> channelIds;
1024 channelIds.reserve(channels.size());
1025 json::array_t channelsJson;
1029 for (
const auto& channelWithoutId : channels) {
1031 channelIds.push_back(newId);
1032 Channel newChannel{newId, channelWithoutId};
1033 channelsJson.push_back(newChannel);
1034 _channels.emplace(newId, std::move(newChannel));
1038 const auto msg =
json{{
"op",
"advertise"}, {
"channels", channelsJson}}.dump();
1039 std::shared_lock<std::shared_mutex> clientsLock(
_clientsMutex);
1040 for (
const auto& [hdl, clientInfo] :
_clients) {
1048 template <
typename ServerConfiguration>
1050 if (channelIds.empty()) {
1055 std::unique_lock<std::shared_mutex> channelsLock(
_channelsMutex);
1056 for (
auto channelId : channelIds) {
1061 const auto msg =
json{{
"op",
"unadvertise"}, {
"channelIds", channelIds}}.dump();
1063 std::unique_lock<std::shared_mutex> clientsLock(
_clientsMutex);
1064 for (
auto& [hdl, clientInfo] :
_clients) {
1065 for (
auto channelId : channelIds) {
1066 if (
const auto it = clientInfo.subscriptionsByChannel.find(channelId);
1067 it != clientInfo.subscriptionsByChannel.end()) {
1068 clientInfo.subscriptionsByChannel.erase(it);
1075 template <
typename ServerConfiguration>
1077 ConnHandle hdl,
const std::vector<Parameter>& parameters,
1078 const std::optional<std::string>& requestId) {
1080 std::vector<Parameter> nonEmptyParameters;
1081 std::copy_if(parameters.begin(), parameters.end(), std::back_inserter(nonEmptyParameters),
1086 nlohmann::json jsonPayload{{
"op",
"parameterValues"}, {
"parameters", nonEmptyParameters}};
1088 jsonPayload[
"id"] = requestId.value();
1093 template <
typename ServerConfiguration>
1095 const std::vector<Parameter>& parameters) {
1098 std::vector<Parameter> paramsToSendToClient;
1101 std::copy_if(parameters.begin(), parameters.end(), std::back_inserter(paramsToSendToClient),
1103 return clientParamSubscriptions.second.find(
param.getName()) !=
1104 clientParamSubscriptions.second.end();
1107 if (!paramsToSendToClient.empty()) {
1113 template <
typename ServerConfiguration>
1115 const std::vector<ServiceWithoutId>& services) {
1116 if (services.empty()) {
1121 std::vector<ServiceId> serviceIds;
1123 for (
const auto& service : services) {
1126 serviceIds.push_back(serviceId);
1127 newServices.push_back(
Service(service, serviceId));
1130 const auto msg =
json{{
"op",
"advertiseServices"}, {
"services", std::move(newServices)}}.dump();
1131 std::shared_lock<std::shared_mutex> clientsLock(
_clientsMutex);
1132 for (
const auto& [hdl, clientInfo] :
_clients) {
1140 template <
typename ServerConfiguration>
1143 std::vector<ServiceId> removedServices;
1144 for (
const auto& serviceId : serviceIds) {
1147 removedServices.push_back(serviceId);
1151 if (!removedServices.empty()) {
1153 json{{
"op",
"unadvertiseServices"}, {
"serviceIds", std::move(removedServices)}}.dump();
1154 std::shared_lock<std::shared_mutex> clientsLock(
_clientsMutex);
1155 for (
const auto& [hdl, clientInfo] :
_clients) {
1162 template <
typename ServerConfiguration>
1164 uint64_t timestamp,
const uint8_t* payload,
1165 size_t payloadSize) {
1167 const auto con =
_server.get_con_from_hdl(clientHandle, ec);
1172 const auto bufferSizeinBytes = con->get_buffered_amount();
1174 const auto logFn = [
this, clientHandle]() {
1181 SubscriptionId subId = std::numeric_limits<SubscriptionId>::max();
1185 const auto clientHandleAndInfoIt =
_clients.find(clientHandle);
1186 if (clientHandleAndInfoIt ==
_clients.end()) {
1190 const auto& client = clientHandleAndInfoIt->second;
1191 const auto& subs = client.subscriptionsByChannel.find(chanId);
1192 if (subs == client.subscriptionsByChannel.end()) {
1195 subId = subs->second;
1198 std::array<uint8_t, 1 + 4 + 8> msgHeader;
1203 const size_t messageSize = msgHeader.size() + payloadSize;
1204 auto message = con->get_message(OpCode::BINARY, messageSize);
1207 message->set_payload(msgHeader.data(), msgHeader.size());
1208 message->append_payload(payload, payloadSize);
1212 template <
typename ServerConfiguration>
1214 std::array<uint8_t, 1 + 8> message;
1219 for (
const auto& [hdl, clientInfo] :
_clients) {
1221 sendBinary(hdl, message.data(), message.size());
1225 template <
typename ServerConfiguration>
1228 std::vector<uint8_t> payload(1 + response.
size());
1230 response.
write(payload.data() + 1);
1231 sendBinary(clientHandle, payload.data(), payload.size());
1234 template <
typename ServerConfiguration>
1237 auto endpoint =
_server.get_local_endpoint(ec);
1239 throw std::runtime_error(
"Server not listening on any port. Has it been started before?");
1241 return endpoint.port();
1244 template <
typename ServerConfiguration>
1248 json::array_t publisherDiff, subscriberDiff, servicesDiff;
1249 std::unordered_set<std::string> topicNames, serviceNames;
1250 std::unordered_set<std::string> knownTopicNames, knownServiceNames;
1253 for (
const auto& [name, publisherIds] : publishedTopics) {
1257 publisherDiff.push_back(
nlohmann::json{{
"name", name}, {
"publisherIds", publisherIds}});
1259 topicNames.insert(name);
1261 for (
const auto& [name, subscriberIds] : subscribedTopics) {
1265 subscriberDiff.push_back(
nlohmann::json{{
"name", name}, {
"subscriberIds", subscriberIds}});
1267 topicNames.insert(name);
1269 for (
const auto& [name, providerIds] : advertisedServices) {
1273 servicesDiff.push_back(
nlohmann::json{{
"name", name}, {
"providerIds", providerIds}});
1275 serviceNames.insert(name);
1279 knownTopicNames.insert(nameWithIds.first);
1282 knownTopicNames.insert(nameWithIds.first);
1285 knownServiceNames.insert(nameWithIds.first);
1293 std::vector<std::string> removedTopics, removedServices;
1294 std::copy_if(knownTopicNames.begin(), knownTopicNames.end(), std::back_inserter(removedTopics),
1295 [&topicNames](
const std::string& topic) {
1296 return topicNames.find(topic) == topicNames.end();
1298 std::copy_if(knownServiceNames.begin(), knownServiceNames.end(),
1299 std::back_inserter(removedServices), [&serviceNames](
const std::string& service) {
1300 return serviceNames.find(service) == serviceNames.end();
1303 if (publisherDiff.empty() && subscriberDiff.empty() && servicesDiff.empty() &&
1304 removedTopics.empty() && removedServices.empty()) {
1309 {
"op",
"connectionGraphUpdate"}, {
"publishedTopics", publisherDiff},
1310 {
"subscribedTopics", subscriberDiff}, {
"advertisedServices", servicesDiff},
1311 {
"removedTopics", removedTopics}, {
"removedServices", removedServices},
1313 const auto payload = msg.dump();
1315 std::shared_lock<std::shared_mutex> clientsLock(
_clientsMutex);
1316 for (
const auto& [hdl, clientInfo] :
_clients) {
1317 if (clientInfo.subscribedToConnectionGraph) {
1318 _server.send(hdl, payload, OpCode::TEXT);
1323 template <
typename ServerConfiguration>
1326 const auto con =
_server.get_con_from_hdl(clientHandle, ec);
1327 return con ? con->get_remote_endpoint() :
"(unknown)";
1330 template <
typename ServerConfiguration>
1333 [paramName](
const auto& paramSubscriptions) {
1334 return paramSubscriptions.second.find(paramName) !=
1335 paramSubscriptions.second.end();
1339 template <
typename ServerConfiguration>
1341 ConnHandle hdl,
const std::unordered_set<std::string>& paramNames) {
1342 std::vector<std::string> paramsToUnsubscribe;
1345 std::copy_if(paramNames.begin(), paramNames.end(), std::back_inserter(paramsToUnsubscribe),
1346 [
this](
const std::string& paramName) {
1352 for (
const auto&
param : paramsToUnsubscribe) {
1353 _server.get_alog().write(APP,
"Unsubscribing from parameter '" +
param +
"'.");
1359 }
catch (
const std::exception& e) {
1363 "Failed to unsubscribe from one more more parameters");
1368 template <
typename ServerConfiguration>
ROSCPP_DECL uint32_t getPort()
bool hasCapability(const std::string &capability) const
void sendStatusAndLogMsg(ConnHandle clientHandle, const StatusLevel level, const std::string &message)
bool param(const std::string ¶m_name, T ¶m_val, const T &default_val)
struct foxglove::Server::@0 _connectionGraph
void handleConnectionClosed(ConnHandle hdl)
constexpr char SUPPORTED_SUBPROTOCOL[]
uint16_t getPort() override
std::function< void(const std::vector< std::string > &, const std::optional< std::string > &, ConnectionHandle)> parameterRequestHandler
void updateParameterValues(const std::vector< Parameter > ¶meters) override
static const websocketpp::log::level WARNING
std::function< void(ChannelId, ConnectionHandle)> unsubscribeHandler
const std::unordered_map< ClientBinaryOpcode, std::string > CAPABILITY_BY_CLIENT_BINARY_OPERATION
Map of required capability by client operation (binary).
void unsubscribeParamsWithoutSubscriptions(ConnHandle hdl, const std::unordered_set< std::string > ¶mNames)
std::vector< std::regex > clientTopicWhitelistPatterns
websocketpp::lib::asio::ip::tcp Tcp
void write(uint8_t *data) const
void read(const uint8_t *data, size_t size)
void handleTextMessage(ConnHandle hdl, MessagePtr msg)
bool isWhitelisted(const std::string &name, const std::vector< std::regex > ®exPatterns)
static const websocketpp::log::level APP
void sendJsonRaw(ConnHandle hdl, const std::string &payload)
void sendMessage(ConnHandle clientHandle, ChannelId chanId, uint64_t timestamp, const uint8_t *payload, size_t payloadSize) override
std::shared_mutex _clientChannelsMutex
websocketpp::connection< ServerConfiguration > ConnectionType
void handleMessage(ConnHandle hdl, MessagePtr msg)
void socketInit(ConnHandle hdl)
constexpr char CAPABILITY_CLIENT_PUBLISH[]
std::shared_mutex _connectionGraphMutex
std::unordered_map< std::string, std::string > metadata
ClientInfo(const std::string &name, ConnHandle handle)
std::function< void(const ServiceRequest &, ConnectionHandle)> serviceRequestHandler
std::function< void(const ClientAdvertisement &, ConnectionHandle)> clientAdvertiseHandler
bool isParameterSubscribed(const std::string ¶mName) const
std::shared_mutex _servicesMutex
std::function< void(WebSocketLogLevel, char const *)> LogCallback
std::string remoteEndpointString(ConnHandle clientHandle) override
std::function< void(ChannelId, ConnectionHandle)> subscribeHandler
MapOfSets subscribedTopics
std::function< void(const std::vector< Parameter > &, const std::optional< std::string > &, ConnectionHandle)> parameterChangeHandler
constexpr uint32_t Integer(const std::string_view str)
std::unordered_map< std::string, std::unordered_set< std::string > > MapOfSets
constexpr char CAPABILITY_PARAMETERS_SUBSCRIBE[]
std::shared_mutex _channelsMutex
websocketpp::frame::opcode::value OpCode
void sendJson(ConnHandle hdl, json &&payload)
std::unordered_map< ChannelId, SubscriptionId > subscriptionsByChannel
typename ServerType::message_ptr MessagePtr
const std::unordered_map< std::string, std::string > CAPABILITY_BY_CLIENT_OPERATION
Map of required capability by client operation (text).
MapOfSets advertisedServices
std::function< void(ClientChannelId, ConnectionHandle)> clientUnadvertiseHandler
void handleBinaryMessage(ConnHandle hdl, MessagePtr msg)
constexpr char CAPABILITY_CONNECTION_GRAPH[]
void updateConnectionGraph(const MapOfSets &publishedTopics, const MapOfSets &subscribedTopics, const MapOfSets &advertisedServices) override
constexpr websocketpp::log::level StatusLevelToLogLevel(StatusLevel level)
std::vector< ChannelId > addChannels(const std::vector< ChannelWithoutId > &channels) override
void publishParameterValues(ConnHandle clientHandle, const std::vector< Parameter > ¶meters, const std::optional< std::string > &requestId=std::nullopt) override
websocketpp::server< ServerConfiguration > ServerType
std::unordered_map< ChannelId, Channel > _channels
std::map< ConnHandle, std::unordered_set< std::string >, std::owner_less<> > _clientParamSubscriptions
std::unordered_set< ClientChannelId > advertisedChannels
std::unique_ptr< std::thread > _serverThread
std::unique_ptr< CallbackQueue > _handlerCallbackQueue
void broadcastTime(uint64_t timestamp) override
void removeServices(const std::vector< ServiceId > &serviceIds) override
std::function< void(bool)> subscribeConnectionGraphHandler
void removeChannels(const std::vector< ChannelId > &channelIds) override
#define FOXGLOVE_DEBOUNCE(f, ms)
void sendServiceResponse(ConnHandle clientHandle, const ServiceResponse &response) override
bool validateConnection(ConnHandle hdl)
void setHandlers(ServerHandlers< ConnHandle > &&handlers) override
constexpr char CAPABILITY_SERVICES[]
void sendBinary(ConnHandle hdl, const uint8_t *payload, size_t payloadSize)
size_t sendBufferLimitBytes
std::vector< std::string > capabilities
static const websocketpp::log::level RECOVERABLE
std::string IPAddressToString(const asio::ip::address &addr)
std::map< ConnHandle, std::unordered_map< ClientChannelId, ClientAdvertisement >, std::owner_less<> > _clientChannels
websocketpp::connection_hdl ConnHandle
MapOfSets publishedTopics
std::shared_mutex _clientsMutex
void start(const std::string &host, uint16_t port) override
std::unordered_map< ServiceId, ServiceWithoutId > _services
constexpr char CAPABILITY_PARAMETERS[]
void handleConnectionOpened(ConnHandle hdl)
ClientChannelId channelId
std::mutex _clientParamSubscriptionsMutex
Server(std::string name, LogCallback logger, const ServerOptions &options)
std::vector< ServiceId > addServices(const std::vector< ServiceWithoutId > &services) override
std::map< ConnHandle, ClientInfo, std::owner_less<> > _clients
std::function< void(const ClientMessage &, ConnectionHandle)> clientMessageHandler
std::function< void(const std::vector< std::string > &, ParameterSubscriptionOperation, ConnectionHandle)> parameterSubscriptionHandler
void WriteUint64LE(uint8_t *buf, uint64_t val)
ServerHandlers< ConnHandle > _handlers
std::vector< std::string > supportedEncodings
void WriteUint32LE(uint8_t *buf, uint32_t val)