1 #include <unordered_set>
3 #include <resource_retriever/retriever.hpp>
9 inline bool isHiddenTopicOrService(
const std::string& name) {
11 throw std::invalid_argument(
"Topic or service name can't be empty");
13 return name.front() ==
'_' || name.find(
"/_") != std::string::npos;
17 using namespace std::chrono_literals;
18 using namespace std::placeholders;
22 : Node(
"foxglove_bridge", options) {
23 const char* rosDistro = std::getenv(
"ROS_DISTRO");
24 RCLCPP_INFO(this->get_logger(),
"Starting foxglove_bridge (%s, %s@%s) with %s", rosDistro,
30 const auto port =
static_cast<uint16_t
>(this->get_parameter(
PARAM_PORT).as_int());
31 const auto address = this->get_parameter(
PARAM_ADDRESS).as_string();
32 const auto send_buffer_limit =
34 const auto useTLS = this->get_parameter(
PARAM_USETLS).as_bool();
35 const auto certfile = this->get_parameter(
PARAM_CERTFILE).as_string();
36 const auto keyfile = this->get_parameter(
PARAM_KEYFILE).as_string();
39 const auto bestEffortQosTopicWhiteList =
49 _useSimTime = this->get_parameter(
"use_sim_time").as_bool();
51 const auto clientTopicWhiteList =
53 const auto clientTopicWhiteListPatterns =
parseRegexStrings(
this, clientTopicWhiteList);
69 serverOptions.
metadata = {{
"ROS_DISTRO", rosDistro}};
71 serverOptions.
sessionId = std::to_string(std::time(
nullptr));
73 serverOptions.
useTls = useTLS;
75 serverOptions.
keyfile = keyfile;
78 _server = foxglove::ServerFactory::createServer<ConnectionHandle>(
"foxglove_bridge",
logHandler,
98 _paramInterface = std::make_shared<ParameterInterface>(
this, paramWhitelistPatterns);
110 _server->setHandlers(std::move(hdlrs));
114 uint16_t listeningPort =
_server->getPort();
115 if (port != listeningPort) {
116 RCLCPP_DEBUG(this->get_logger(),
"Reassigning \"port\" parameter from %d to %d", port,
118 this->set_parameter(rclcpp::Parameter{
PARAM_PORT, listeningPort});
127 this->create_callback_group(rclcpp::CallbackGroupType::MutuallyExclusive);
132 "/clock", rclcpp::QoS{rclcpp::KeepLast(1)}.best_effort(),
133 [&](std::shared_ptr<const rosgraph_msgs::msg::Clock> msg) {
134 const auto timestamp = rclcpp::Time{msg->clock}.nanoseconds();
135 assert(timestamp >= 0 &&
"Timestamp is negative");
136 _server->broadcastTime(
static_cast<uint64_t
>(timestamp));
143 RCLCPP_INFO(this->get_logger(),
"Shutting down %s", this->get_name());
148 RCLCPP_INFO(this->get_logger(),
"Shutdown complete");
155 auto graphEvent = this->get_graph_event();
158 this->wait_for_graph_change(graphEvent, 200ms);
159 bool triggered = graphEvent->check_and_clear();
161 RCLCPP_DEBUG(this->get_logger(),
"rosgraph change detected");
162 const auto topicNamesAndTypes = get_topic_names_and_types();
169 std::this_thread::sleep_for(500ms);
171 }
catch (
const std::exception& ex) {
172 RCLCPP_ERROR(this->get_logger(),
"Exception thrown in rosgraphPollThread: %s", ex.what());
176 RCLCPP_DEBUG(this->get_logger(),
"rosgraph polling thread exiting");
180 const std::map<std::string, std::vector<std::string>>& topicNamesAndTypes) {
185 std::unordered_set<TopicAndDatatype, PairHash> latestTopics;
186 latestTopics.reserve(topicNamesAndTypes.size());
187 for (
const auto& topicNamesAndType : topicNamesAndTypes) {
188 const auto& topicName = topicNamesAndType.first;
189 const auto& datatypes = topicNamesAndType.second;
198 for (
const auto&
datatype : datatypes) {
199 latestTopics.emplace(topicName,
datatype);
204 if (
const auto numIgnoredTopics = topicNamesAndTypes.size() - latestTopics.size()) {
207 "%zu topics have been ignored as they do not match any pattern on the topic whitelist",
214 std::vector<foxglove::ChannelId> channelIdsToRemove;
217 channelIt->second.schemaName};
218 if (latestTopics.find(topicAndDatatype) == latestTopics.end()) {
219 const auto channelId = channelIt->first;
220 channelIdsToRemove.push_back(channelId);
222 RCLCPP_INFO(this->get_logger(),
"Removed channel %d for topic \"%s\" (%s)", channelId,
223 topicAndDatatype.first.c_str(), topicAndDatatype.second.c_str());
229 _server->removeChannels(channelIdsToRemove);
232 std::vector<foxglove::ChannelWithoutId> channelsToAdd;
233 for (
const auto& topicAndDatatype : latestTopics) {
235 [topicAndDatatype](
const auto& channelIdAndChannel) {
236 const auto& channel = channelIdAndChannel.second;
237 return channel.topic == topicAndDatatype.first &&
238 channel.schemaName == topicAndDatatype.second;
244 newChannel.
topic = topicAndDatatype.first;
245 newChannel.schemaName = topicAndDatatype.second;
251 newChannel.encoding =
"cdr";
252 newChannel.schema = schema;
253 newChannel.schemaEncoding =
"ros2msg";
256 newChannel.encoding =
"cdr";
257 newChannel.schema = schema;
258 newChannel.schemaEncoding =
"ros2idl";
263 RCLCPP_WARN(this->get_logger(),
"Could not find definition for type %s: %s",
264 topicAndDatatype.second.c_str(), err.
what());
266 newChannel.schema =
"";
267 }
catch (
const std::exception& err) {
268 RCLCPP_WARN(this->get_logger(),
"Failed to add channel for topic \"%s\" (%s): %s",
269 topicAndDatatype.first.c_str(), topicAndDatatype.second.c_str(), err.what());
272 channelsToAdd.push_back(newChannel);
275 const auto channelIds =
_server->addChannels(channelsToAdd);
276 for (
size_t i = 0; i < channelsToAdd.size(); ++i) {
277 const auto channelId = channelIds[i];
278 const auto& channel = channelsToAdd[i];
280 RCLCPP_DEBUG(this->get_logger(),
"Advertising channel %d for topic \"%s\" (%s)", channelId,
281 channel.topic.c_str(), channel.schemaName.c_str());
293 const auto serviceNamesAndTypes = this->get_node_graph_interface()->get_service_names_and_types();
298 std::vector<foxglove::ServiceId> servicesToRemove;
300 const auto it = std::find_if(serviceNamesAndTypes.begin(), serviceNamesAndTypes.end(),
301 [service](
const auto& serviceNameAndTypes) {
302 return serviceNameAndTypes.first == service.second.name;
304 if (it == serviceNamesAndTypes.end()) {
305 servicesToRemove.push_back(service.first);
308 for (
auto serviceId : servicesToRemove) {
311 _server->removeServices(servicesToRemove);
314 std::vector<foxglove::ServiceWithoutId> newServices;
315 for (
const auto& serviceNamesAndType : serviceNamesAndTypes) {
316 const auto& serviceName = serviceNamesAndType.first;
317 const auto& datatypes = serviceNamesAndType.second;
321 [serviceName](
const auto& idWithService) {
322 return idWithService.second.name == serviceName;
338 service.
name = serviceName;
339 service.
type = datatypes.front();
352 RCLCPP_WARN(this->get_logger(),
353 "IDL message definition format cannot be communicated over ws-protocol. "
354 "Service \"%s\" (%s) may not decode correctly in clients",
355 service.
name.c_str(), service.
type.c_str());
361 RCLCPP_WARN(this->get_logger(),
"Could not find definition for type %s: %s",
366 }
catch (
const std::exception& err) {
367 RCLCPP_WARN(this->get_logger(),
"Failed to add service \"%s\" (%s): %s", service.
name.c_str(),
368 service.
type.c_str(), err.what());
372 newServices.push_back(service);
375 const auto serviceIds =
_server->addServices(newServices);
376 for (
size_t i = 0; i < serviceIds.size(); ++i) {
382 const std::map<std::string, std::vector<std::string>>& topicNamesAndTypes) {
385 for (
const auto& topicNameAndType : topicNamesAndTypes) {
386 const auto& topicName = topicNameAndType.first;
391 const auto publishersInfo = get_publishers_info_by_topic(topicName);
392 const auto subscribersInfo = get_subscriptions_info_by_topic(topicName);
393 std::unordered_set<std::string> publisherIds, subscriberIds;
394 for (
const auto& publisher : publishersInfo) {
395 const auto& ns = publisher.node_namespace();
396 const auto sep = (!ns.empty() && ns.back() ==
'/') ?
"" :
"/";
397 publisherIds.insert(ns + sep + publisher.node_name());
399 for (
const auto& subscriber : subscribersInfo) {
400 const auto& ns = subscriber.node_namespace();
401 const auto sep = (!ns.empty() && ns.back() ==
'/') ?
"" :
"/";
402 subscriberIds.insert(ns + sep + subscriber.node_name());
404 publishers.emplace(topicName, publisherIds);
405 subscribers.emplace(topicName, subscriberIds);
409 for (
const auto& fqnNodeName : get_node_names()) {
411 const auto serviceNamesAndTypes = get_service_names_and_types_by_node(nodeName, nodeNs);
413 for (
const auto& [serviceName, serviceTypes] : serviceNamesAndTypes) {
416 services[serviceName].insert(fqnNodeName);
421 _server->updateConnectionGraph(publishers, subscribers, services);
435 channelId,
"Received subscribe request for unknown channel " + std::to_string(channelId));
438 const auto& channel = it->second;
439 const auto& topic = channel.topic;
440 const auto&
datatype = channel.schemaName;
443 auto [subscriptionsIt, firstSubscription] =
445 auto& subscriptionsByClient = subscriptionsIt->second;
447 if (!firstSubscription &&
448 subscriptionsByClient.find(clientHandle) != subscriptionsByClient.end()) {
450 channelId,
"Client is already subscribed to channel " + std::to_string(channelId));
453 rclcpp::SubscriptionEventCallbacks eventCallbacks;
454 eventCallbacks.incompatible_qos_callback = [&](
const rclcpp::QOSRequestedIncompatibleQoSInfo&) {
455 RCLCPP_ERROR(this->get_logger(),
"Incompatible subscriber QoS settings for topic \"%s\" (%s)",
459 rclcpp::SubscriptionOptions subscriptionOptions;
460 subscriptionOptions.event_callbacks = eventCallbacks;
467 size_t reliabilityReliableEndpointsCount = 0;
468 size_t durabilityTransientLocalEndpointsCount = 0;
470 const auto publisherInfo = this->get_publishers_info_by_topic(topic);
471 for (
const auto& publisher : publisherInfo) {
472 const auto& qos = publisher.qos_profile();
473 if (qos.reliability() == rclcpp::ReliabilityPolicy::Reliable) {
474 ++reliabilityReliableEndpointsCount;
476 if (qos.durability() == rclcpp::DurabilityPolicy::TransientLocal) {
477 ++durabilityTransientLocalEndpointsCount;
486 const size_t publisherHistoryDepth = std::max(
static_cast<size_t>(1), qos.depth());
487 depth = depth + publisherHistoryDepth;
492 RCLCPP_WARN(this->get_logger(),
493 "Limiting history depth for topic '%s' to %zu (was %zu). You may want to increase "
494 "the max_qos_depth parameter value.",
499 rclcpp::QoS qos{rclcpp::KeepLast(depth)};
504 }
else if (!publisherInfo.empty() && reliabilityReliableEndpointsCount == publisherInfo.size()) {
508 if (reliabilityReliableEndpointsCount > 0) {
511 "Some, but not all, publishers on topic '%s' are offering QoSReliabilityPolicy.RELIABLE. "
512 "Falling back to QoSReliabilityPolicy.BEST_EFFORT as it will connect to all publishers",
519 if (!publisherInfo.empty() && durabilityTransientLocalEndpointsCount == publisherInfo.size()) {
520 qos.transient_local();
522 if (durabilityTransientLocalEndpointsCount > 0) {
523 RCLCPP_WARN(this->get_logger(),
524 "Some, but not all, publishers on topic '%s' are offering "
525 "QoSDurabilityPolicy.TRANSIENT_LOCAL. Falling back to "
526 "QoSDurabilityPolicy.VOLATILE as it will connect to all publishers",
529 qos.durability_volatile();
532 if (firstSubscription) {
534 this->get_logger(),
"Subscribing to topic \"%s\" (%s) on channel %d with reliablity \"%s\"",
535 topic.c_str(),
datatype.c_str(), channelId,
536 qos.reliability() == rclcpp::ReliabilityPolicy::Reliable ?
"reliable" :
"best_effort");
539 RCLCPP_INFO(this->get_logger(),
"Adding subscriber #%zu to topic \"%s\" (%s) on channel %d",
540 subscriptionsByClient.size(), topic.c_str(),
datatype.c_str(), channelId);
544 auto subscriber = this->create_generic_subscription(
545 topic, datatype, qos,
546 [
this, channelId, clientHandle](std::shared_ptr<const rclcpp::SerializedMessage> msg) {
549 subscriptionOptions);
550 subscriptionsByClient.emplace(clientHandle, std::move(subscriber));
551 }
catch (
const std::exception& ex) {
553 channelId,
"Failed to subscribe to topic " + topic +
" (" + datatype +
"): " + ex.what());
563 channelId,
"Received unsubscribe request for unknown channel " + std::to_string(channelId));
565 const auto& channel = channelIt->second;
570 std::to_string(channelId) +
571 " that was not subscribed to");
574 auto& subscriptionsByClient = subscriptionsIt->second;
575 const auto clientSubscription = subscriptionsByClient.find(clientHandle);
576 if (clientSubscription == subscriptionsByClient.end()) {
578 channelId,
"Received unsubscribe request for channel " + std::to_string(channelId) +
579 "from a client that was not subscribed to this channel");
582 subscriptionsByClient.erase(clientSubscription);
583 if (subscriptionsByClient.empty()) {
584 RCLCPP_INFO(this->get_logger(),
"Unsubscribing from topic \"%s\" (%s) on channel %d",
585 channel.topic.c_str(), channel.schemaName.c_str(), channelId);
588 RCLCPP_INFO(this->get_logger(),
589 "Removed one subscription from channel %d (%zu subscription(s) left)", channelId,
590 subscriptionsByClient.size());
599 auto [clientPublicationsIt, isFirstPublication] =
602 auto& clientPublications = clientPublicationsIt->second;
604 if (!isFirstPublication &&
605 clientPublications.find(advertisement.
channelId) != clientPublications.end()) {
608 "Received client advertisement from " +
_server->remoteEndpointString(hdl) +
" for channel " +
609 std::to_string(advertisement.
channelId) +
" it had already advertised");
615 "Received client advertisement from " +
_server->remoteEndpointString(hdl) +
" for channel " +
616 std::to_string(advertisement.
channelId) +
" with empty schema name");
619 if (advertisement.
encoding ==
"json") {
623 const auto& schemaName = advertisement.
schemaName;
624 std::string schema =
"";
626 if (!advertisement.
schema.empty()) {
628 schema = std::string(
reinterpret_cast<const char*
>(advertisement.
schema.data()),
629 advertisement.
schema.size());
636 "Message definition (.msg) for schema " + schemaName +
" not found.");
639 schema = msgDefinition;
642 auto parser = std::make_shared<RosMsgParser::Parser>(
643 advertisement.
topic, RosMsgParser::ROSType(schemaName), schema);
650 const auto& topicName = advertisement.
topic;
651 const auto& topicType = advertisement.
schemaName;
655 const auto otherPublishers = get_publishers_info_by_topic(topicName);
656 const auto otherPublisherIt =
657 std::find_if(otherPublishers.begin(), otherPublishers.end(),
658 [
this](
const rclcpp::TopicEndpointInfo& endpoint) {
659 return endpoint.node_name() != this->get_name() ||
660 endpoint.node_namespace() != this->get_namespace();
662 rclcpp::QoS qos = otherPublisherIt == otherPublishers.end() ? rclcpp::SystemDefaultsQoS()
663 : otherPublisherIt->qos_profile();
668 if (qos.history() == rclcpp::HistoryPolicy::Unknown) {
669 qos.history(rclcpp::HistoryPolicy::SystemDefault);
671 rclcpp::PublisherOptions publisherOptions{};
673 auto publisher = this->create_generic_publisher(topicName, topicType, qos, publisherOptions);
675 RCLCPP_INFO(this->get_logger(),
676 "Client %s is advertising \"%s\" (%s) on channel %d with encoding \"%s\"",
677 _server->remoteEndpointString(hdl).c_str(), topicName.c_str(), topicType.c_str(),
681 clientPublications.emplace(advertisement.
channelId, std::move(publisher));
682 }
catch (
const std::exception& ex) {
684 std::string(
"Failed to create publisher: ") + ex.what());
694 channelId,
"Ignoring client unadvertisement from " +
_server->remoteEndpointString(hdl) +
695 " for unknown channel " + std::to_string(channelId) +
696 ", client has no advertised topics");
699 auto& clientPublications = it->second;
700 auto it2 = clientPublications.find(channelId);
701 if (it2 == clientPublications.end()) {
703 channelId,
"Ignoring client unadvertisement from " +
_server->remoteEndpointString(hdl) +
704 " for unknown channel " + std::to_string(channelId) +
", client has " +
705 std::to_string(clientPublications.size()) +
" advertised topic(s)");
708 const auto& publisher = it2->second;
709 RCLCPP_INFO(this->get_logger(),
710 "Client %s is no longer advertising %s (%zu subscribers) on channel %d",
711 _server->remoteEndpointString(hdl).c_str(), publisher->get_topic_name(),
712 publisher->get_subscription_count(), channelId);
714 clientPublications.erase(it2);
715 if (clientPublications.empty()) {
722 this->create_wall_timer(1s, []() {});
727 rclcpp::GenericPublisher::SharedPtr publisher;
735 channelId,
"Dropping client message from " +
_server->remoteEndpointString(hdl) +
736 " for unknown channel " + std::to_string(channelId) +
737 ", client has no advertised topics");
740 auto& clientPublications = it->second;
741 auto it2 = clientPublications.find(channelId);
742 if (it2 == clientPublications.end()) {
744 channelId,
"Dropping client message from " +
_server->remoteEndpointString(hdl) +
745 " for unknown channel " + std::to_string(channelId) +
", client has " +
746 std::to_string(clientPublications.size()) +
" advertised topic(s)");
748 publisher = it2->second;
751 auto publishMessage = [publisher,
this](
const void* data,
size_t size) {
753 rclcpp::SerializedMessage serializedMessage{size};
754 auto& rclSerializedMsg = serializedMessage.get_rcl_serialized_message();
755 std::memcpy(rclSerializedMsg.buffer, data, size);
756 rclSerializedMsg.buffer_length = size;
759 publisher->publish(serializedMessage);
761 publisher->publish_as_loaned_msg(serializedMessage);
769 std::shared_ptr<RosMsgParser::Parser>
parser;
774 parser = parserIt->second;
779 "Dropping client message from " +
780 _server->remoteEndpointString(hdl) +
781 " with encoding \"json\": no parser found");
783 thread_local RosMsgParser::ROS2_Serializer serializer;
785 const std::string jsonMessage(
reinterpret_cast<const char*
>(message.
getData()),
788 parser->serializeFromJson(jsonMessage, &serializer);
789 publishMessage(serializer.getBufferData(), serializer.getBufferSize());
790 }
catch (
const std::exception& ex) {
792 "Dropping client message from " +
793 _server->remoteEndpointString(hdl) +
794 " with encoding \"json\": " + ex.what());
800 "Dropping client message from " +
_server->remoteEndpointString(hdl) +
806 const std::optional<std::string>& requestId,
812 std::vector<std::string> parameterNames(parameters.size());
813 for (
size_t i = 0; i < parameters.size(); ++i) {
814 parameterNames[i] = parameters[i].getName();
821 const std::optional<std::string>& requestId,
823 const auto params =
_paramInterface->getParams(parameters, std::chrono::seconds(5));
824 _server->publishParameterValues(hdl, params, requestId);
838 _server->updateParameterValues(parameters);
843 case LogLevel::Debug:
844 RCLCPP_DEBUG(this->get_logger(),
"[WS] %s", msg);
847 RCLCPP_INFO(this->get_logger(),
"[WS] %s", msg);
850 RCLCPP_WARN(this->get_logger(),
"[WS] %s", msg);
852 case LogLevel::Error:
853 RCLCPP_ERROR(this->get_logger(),
"[WS] %s", msg);
855 case LogLevel::Critical:
856 RCLCPP_FATAL(this->get_logger(),
"[WS] %s", msg);
863 std::shared_ptr<const rclcpp::SerializedMessage> msg) {
866 const auto timestamp = this->now().nanoseconds();
867 assert(timestamp >= 0 &&
"Timestamp is negative");
868 const auto rclSerializedMsg = msg->get_rcl_serialized_message();
869 _server->sendMessage(clientHandle, channelId,
static_cast<uint64_t
>(timestamp),
870 rclSerializedMsg.buffer, rclSerializedMsg.buffer_length);
875 RCLCPP_DEBUG(this->get_logger(),
"Received a request for service %d", request.
serviceId);
882 "Service with id " + std::to_string(request.
serviceId) +
" does not exist");
888 auto clientOptions = rcl_client_get_default_options();
889 auto genClient = GenericClient::make_shared(
890 this->get_node_base_interface().
get(), this->get_node_graph_interface(),
891 serviceIt->second.name, serviceIt->second.type, clientOptions);
894 }
catch (
const std::exception& ex) {
897 "Failed to create service client for service " + serviceIt->second.name +
": " + ex.what());
901 auto client = clientIt->second;
902 if (!client->wait_for_service(1s)) {
904 "Service " + serviceIt->second.name +
" is not available");
907 auto reqMessage = std::make_shared<rclcpp::SerializedMessage>(request.
data.size());
908 auto& rclSerializedMsg = reqMessage->get_rcl_serialized_message();
909 std::memcpy(rclSerializedMsg.buffer, request.
data.data(), request.
data.size());
910 rclSerializedMsg.buffer_length = request.
data.size();
912 auto responseReceivedCallback = [
this, request,
914 const auto serializedResponseMsg = future.get()->get_rcl_serialized_message();
916 std::vector<uint8_t>(serializedResponseMsg.buffer_length)};
917 std::memcpy(
response.data.data(), serializedResponseMsg.buffer,
918 serializedResponseMsg.buffer_length);
919 _server->sendServiceResponse(clientHandle, response);
921 client->async_send_request(reqMessage, responseReceivedCallback);
936 throw std::runtime_error(
"Asset URI not allowed: " + uri);
944 std::memcpy(
response.data.data(), memoryResource.
data.get(), memoryResource.
size);
945 }
catch (
const std::exception& ex) {
946 RCLCPP_WARN(this->get_logger(),
"Failed to retrieve asset '%s': %s", uri.c_str(), ex.what());
948 response.errorMessage =
"Failed to retrieve asset " + uri;
952 _server->sendFetchAssetResponse(clientHandle, response);
962 #include <rclcpp_components/register_node_macro.hpp>