1 #include <unordered_set>
3 #include <resource_retriever/retriever.hpp>
9 inline bool isHiddenTopicOrService(
const std::string& name) {
11 throw std::invalid_argument(
"Topic or service name can't be empty");
13 return name.front() ==
'_' || name.find(
"/_") != std::string::npos;
17 using namespace std::chrono_literals;
18 using namespace std::placeholders;
22 : Node(
"foxglove_bridge", options) {
23 const char* rosDistro = std::getenv(
"ROS_DISTRO");
24 RCLCPP_INFO(this->get_logger(),
"Starting foxglove_bridge (%s, %s@%s) with %s", rosDistro,
30 const auto port =
static_cast<uint16_t
>(this->get_parameter(
PARAM_PORT).as_int());
31 const auto address = this->get_parameter(
PARAM_ADDRESS).as_string();
32 const auto send_buffer_limit =
34 const auto useTLS = this->get_parameter(
PARAM_USETLS).as_bool();
35 const auto certfile = this->get_parameter(
PARAM_CERTFILE).as_string();
36 const auto keyfile = this->get_parameter(
PARAM_KEYFILE).as_string();
39 const auto bestEffortQosTopicWhiteList =
49 _useSimTime = this->get_parameter(
"use_sim_time").as_bool();
51 const auto clientTopicWhiteList =
53 const auto clientTopicWhiteListPatterns =
parseRegexStrings(
this, clientTopicWhiteList);
58 const auto ignoreUnresponsiveParamNodes =
71 serverOptions.
metadata = {{
"ROS_DISTRO", rosDistro}};
73 serverOptions.
sessionId = std::to_string(std::time(
nullptr));
75 serverOptions.
useTls = useTLS;
77 serverOptions.
keyfile = keyfile;
80 _server = foxglove::ServerFactory::createServer<ConnectionHandle>(
"foxglove_bridge",
logHandler,
100 _paramInterface = std::make_shared<ParameterInterface>(
this, paramWhitelistPatterns,
101 ignoreUnresponsiveParamNodes
115 _server->setHandlers(std::move(hdlrs));
119 uint16_t listeningPort =
_server->getPort();
120 if (port != listeningPort) {
121 RCLCPP_DEBUG(this->get_logger(),
"Reassigning \"port\" parameter from %d to %d", port,
123 this->set_parameter(rclcpp::Parameter{
PARAM_PORT, listeningPort});
132 this->create_callback_group(rclcpp::CallbackGroupType::MutuallyExclusive);
137 "/clock", rclcpp::QoS{rclcpp::KeepLast(1)}.best_effort(),
138 [&](std::shared_ptr<const rosgraph_msgs::msg::Clock> msg) {
139 const auto timestamp = rclcpp::Time{msg->clock}.nanoseconds();
140 assert(timestamp >= 0 &&
"Timestamp is negative");
141 _server->broadcastTime(
static_cast<uint64_t
>(timestamp));
148 RCLCPP_INFO(this->get_logger(),
"Shutting down %s", this->get_name());
153 RCLCPP_INFO(this->get_logger(),
"Shutdown complete");
160 auto graphEvent = this->get_graph_event();
163 this->wait_for_graph_change(graphEvent, 200ms);
164 bool triggered = graphEvent->check_and_clear();
166 RCLCPP_DEBUG(this->get_logger(),
"rosgraph change detected");
167 const auto topicNamesAndTypes = get_topic_names_and_types();
174 std::this_thread::sleep_for(500ms);
176 }
catch (
const std::exception& ex) {
177 RCLCPP_ERROR(this->get_logger(),
"Exception thrown in rosgraphPollThread: %s", ex.what());
181 RCLCPP_DEBUG(this->get_logger(),
"rosgraph polling thread exiting");
185 const std::map<std::string, std::vector<std::string>>& topicNamesAndTypes) {
190 std::unordered_set<TopicAndDatatype, PairHash> latestTopics;
191 latestTopics.reserve(topicNamesAndTypes.size());
192 for (
const auto& topicNamesAndType : topicNamesAndTypes) {
193 const auto& topicName = topicNamesAndType.first;
194 const auto& datatypes = topicNamesAndType.second;
203 for (
const auto&
datatype : datatypes) {
204 latestTopics.emplace(topicName,
datatype);
209 if (
const auto numIgnoredTopics = topicNamesAndTypes.size() - latestTopics.size()) {
212 "%zu topics have been ignored as they do not match any pattern on the topic whitelist",
219 std::vector<foxglove::ChannelId> channelIdsToRemove;
222 channelIt->second.schemaName};
223 if (latestTopics.find(topicAndDatatype) == latestTopics.end()) {
224 const auto channelId = channelIt->first;
225 channelIdsToRemove.push_back(channelId);
227 RCLCPP_INFO(this->get_logger(),
"Removed channel %d for topic \"%s\" (%s)", channelId,
228 topicAndDatatype.first.c_str(), topicAndDatatype.second.c_str());
234 _server->removeChannels(channelIdsToRemove);
237 std::vector<foxglove::ChannelWithoutId> channelsToAdd;
238 for (
const auto& topicAndDatatype : latestTopics) {
240 [topicAndDatatype](
const auto& channelIdAndChannel) {
241 const auto& channel = channelIdAndChannel.second;
242 return channel.topic == topicAndDatatype.first &&
243 channel.schemaName == topicAndDatatype.second;
249 newChannel.
topic = topicAndDatatype.first;
250 newChannel.schemaName = topicAndDatatype.second;
256 newChannel.encoding =
"cdr";
257 newChannel.schema = schema;
258 newChannel.schemaEncoding =
"ros2msg";
261 newChannel.encoding =
"cdr";
262 newChannel.schema = schema;
263 newChannel.schemaEncoding =
"ros2idl";
268 RCLCPP_WARN(this->get_logger(),
"Could not find definition for type %s: %s",
269 topicAndDatatype.second.c_str(), err.
what());
271 newChannel.schema =
"";
272 }
catch (
const std::exception& err) {
273 RCLCPP_WARN(this->get_logger(),
"Failed to add channel for topic \"%s\" (%s): %s",
274 topicAndDatatype.first.c_str(), topicAndDatatype.second.c_str(), err.what());
277 channelsToAdd.push_back(newChannel);
280 const auto channelIds =
_server->addChannels(channelsToAdd);
281 for (
size_t i = 0; i < channelsToAdd.size(); ++i) {
282 const auto channelId = channelIds[i];
283 const auto& channel = channelsToAdd[i];
285 RCLCPP_DEBUG(this->get_logger(),
"Advertising channel %d for topic \"%s\" (%s)", channelId,
286 channel.topic.c_str(), channel.schemaName.c_str());
298 const auto serviceNamesAndTypes = this->get_node_graph_interface()->get_service_names_and_types();
303 std::vector<foxglove::ServiceId> servicesToRemove;
305 const auto it = std::find_if(serviceNamesAndTypes.begin(), serviceNamesAndTypes.end(),
306 [service](
const auto& serviceNameAndTypes) {
307 return serviceNameAndTypes.first == service.second.name;
309 if (it == serviceNamesAndTypes.end()) {
310 servicesToRemove.push_back(service.first);
313 for (
auto serviceId : servicesToRemove) {
316 _server->removeServices(servicesToRemove);
319 std::vector<foxglove::ServiceWithoutId> newServices;
320 for (
const auto& serviceNamesAndType : serviceNamesAndTypes) {
321 const auto& serviceName = serviceNamesAndType.first;
322 const auto& datatypes = serviceNamesAndType.second;
326 [serviceName](
const auto& idWithService) {
327 return idWithService.second.name == serviceName;
343 service.
name = serviceName;
344 service.
type = datatypes.front();
357 RCLCPP_WARN(this->get_logger(),
358 "IDL message definition format cannot be communicated over ws-protocol. "
359 "Service \"%s\" (%s) may not decode correctly in clients",
360 service.
name.c_str(), service.
type.c_str());
366 RCLCPP_WARN(this->get_logger(),
"Could not find definition for type %s: %s",
371 }
catch (
const std::exception& err) {
372 RCLCPP_WARN(this->get_logger(),
"Failed to add service \"%s\" (%s): %s", service.
name.c_str(),
373 service.
type.c_str(), err.what());
377 newServices.push_back(service);
380 const auto serviceIds =
_server->addServices(newServices);
381 for (
size_t i = 0; i < serviceIds.size(); ++i) {
387 const std::map<std::string, std::vector<std::string>>& topicNamesAndTypes) {
390 for (
const auto& topicNameAndType : topicNamesAndTypes) {
391 const auto& topicName = topicNameAndType.first;
396 const auto publishersInfo = get_publishers_info_by_topic(topicName);
397 const auto subscribersInfo = get_subscriptions_info_by_topic(topicName);
398 std::unordered_set<std::string> publisherIds, subscriberIds;
399 for (
const auto& publisher : publishersInfo) {
400 const auto& ns = publisher.node_namespace();
401 const auto sep = (!ns.empty() && ns.back() ==
'/') ?
"" :
"/";
402 publisherIds.insert(ns + sep + publisher.node_name());
404 for (
const auto& subscriber : subscribersInfo) {
405 const auto& ns = subscriber.node_namespace();
406 const auto sep = (!ns.empty() && ns.back() ==
'/') ?
"" :
"/";
407 subscriberIds.insert(ns + sep + subscriber.node_name());
409 publishers.emplace(topicName, publisherIds);
410 subscribers.emplace(topicName, subscriberIds);
414 for (
const auto& fqnNodeName : get_node_names()) {
416 const auto serviceNamesAndTypes = get_service_names_and_types_by_node(nodeName, nodeNs);
418 for (
const auto& [serviceName, serviceTypes] : serviceNamesAndTypes) {
421 services[serviceName].insert(fqnNodeName);
426 _server->updateConnectionGraph(publishers, subscribers, services);
440 channelId,
"Received subscribe request for unknown channel " + std::to_string(channelId));
443 const auto& channel = it->second;
444 const auto& topic = channel.topic;
445 const auto&
datatype = channel.schemaName;
448 auto [subscriptionsIt, firstSubscription] =
450 auto& subscriptionsByClient = subscriptionsIt->second;
452 if (!firstSubscription &&
453 subscriptionsByClient.find(clientHandle) != subscriptionsByClient.end()) {
455 channelId,
"Client is already subscribed to channel " + std::to_string(channelId));
458 rclcpp::SubscriptionEventCallbacks eventCallbacks;
459 eventCallbacks.incompatible_qos_callback = [&](
const rclcpp::QOSRequestedIncompatibleQoSInfo&) {
460 RCLCPP_ERROR(this->get_logger(),
"Incompatible subscriber QoS settings for topic \"%s\" (%s)",
464 rclcpp::SubscriptionOptions subscriptionOptions;
465 subscriptionOptions.event_callbacks = eventCallbacks;
472 size_t reliabilityReliableEndpointsCount = 0;
473 size_t durabilityTransientLocalEndpointsCount = 0;
475 const auto publisherInfo = this->get_publishers_info_by_topic(topic);
476 for (
const auto& publisher : publisherInfo) {
477 const auto& qos = publisher.qos_profile();
478 if (qos.reliability() == rclcpp::ReliabilityPolicy::Reliable) {
479 ++reliabilityReliableEndpointsCount;
481 if (qos.durability() == rclcpp::DurabilityPolicy::TransientLocal) {
482 ++durabilityTransientLocalEndpointsCount;
491 const size_t publisherHistoryDepth = std::max(
static_cast<size_t>(1), qos.depth());
492 depth = depth + publisherHistoryDepth;
497 RCLCPP_WARN(this->get_logger(),
498 "Limiting history depth for topic '%s' to %zu (was %zu). You may want to increase "
499 "the max_qos_depth parameter value.",
504 rclcpp::QoS qos{rclcpp::KeepLast(depth)};
509 }
else if (!publisherInfo.empty() && reliabilityReliableEndpointsCount == publisherInfo.size()) {
513 if (reliabilityReliableEndpointsCount > 0) {
516 "Some, but not all, publishers on topic '%s' are offering QoSReliabilityPolicy.RELIABLE. "
517 "Falling back to QoSReliabilityPolicy.BEST_EFFORT as it will connect to all publishers",
524 if (!publisherInfo.empty() && durabilityTransientLocalEndpointsCount == publisherInfo.size()) {
525 qos.transient_local();
527 if (durabilityTransientLocalEndpointsCount > 0) {
528 RCLCPP_WARN(this->get_logger(),
529 "Some, but not all, publishers on topic '%s' are offering "
530 "QoSDurabilityPolicy.TRANSIENT_LOCAL. Falling back to "
531 "QoSDurabilityPolicy.VOLATILE as it will connect to all publishers",
534 qos.durability_volatile();
537 if (firstSubscription) {
539 this->get_logger(),
"Subscribing to topic \"%s\" (%s) on channel %d with reliablity \"%s\"",
540 topic.c_str(),
datatype.c_str(), channelId,
541 qos.reliability() == rclcpp::ReliabilityPolicy::Reliable ?
"reliable" :
"best_effort");
544 RCLCPP_INFO(this->get_logger(),
"Adding subscriber #%zu to topic \"%s\" (%s) on channel %d",
545 subscriptionsByClient.size(), topic.c_str(),
datatype.c_str(), channelId);
549 auto subscriber = this->create_generic_subscription(
550 topic, datatype, qos,
551 [
this, channelId, clientHandle](std::shared_ptr<const rclcpp::SerializedMessage> msg) {
554 subscriptionOptions);
555 subscriptionsByClient.emplace(clientHandle, std::move(subscriber));
556 }
catch (
const std::exception& ex) {
558 channelId,
"Failed to subscribe to topic " + topic +
" (" + datatype +
"): " + ex.what());
568 channelId,
"Received unsubscribe request for unknown channel " + std::to_string(channelId));
570 const auto& channel = channelIt->second;
575 std::to_string(channelId) +
576 " that was not subscribed to");
579 auto& subscriptionsByClient = subscriptionsIt->second;
580 const auto clientSubscription = subscriptionsByClient.find(clientHandle);
581 if (clientSubscription == subscriptionsByClient.end()) {
583 channelId,
"Received unsubscribe request for channel " + std::to_string(channelId) +
584 "from a client that was not subscribed to this channel");
587 subscriptionsByClient.erase(clientSubscription);
588 if (subscriptionsByClient.empty()) {
589 RCLCPP_INFO(this->get_logger(),
"Unsubscribing from topic \"%s\" (%s) on channel %d",
590 channel.topic.c_str(), channel.schemaName.c_str(), channelId);
593 RCLCPP_INFO(this->get_logger(),
594 "Removed one subscription from channel %d (%zu subscription(s) left)", channelId,
595 subscriptionsByClient.size());
604 auto [clientPublicationsIt, isFirstPublication] =
607 auto& clientPublications = clientPublicationsIt->second;
609 if (!isFirstPublication &&
610 clientPublications.find(advertisement.
channelId) != clientPublications.end()) {
613 "Received client advertisement from " +
_server->remoteEndpointString(hdl) +
" for channel " +
614 std::to_string(advertisement.
channelId) +
" it had already advertised");
620 "Received client advertisement from " +
_server->remoteEndpointString(hdl) +
" for channel " +
621 std::to_string(advertisement.
channelId) +
" with empty schema name");
624 if (advertisement.
encoding ==
"json") {
628 const auto& schemaName = advertisement.
schemaName;
629 std::string schema =
"";
631 if (!advertisement.
schema.empty()) {
633 schema = std::string(
reinterpret_cast<const char*
>(advertisement.
schema.data()),
634 advertisement.
schema.size());
641 "Message definition (.msg) for schema " + schemaName +
" not found.");
644 schema = msgDefinition;
647 auto parser = std::make_shared<RosMsgParser::Parser>(
648 advertisement.
topic, RosMsgParser::ROSType(schemaName), schema);
655 const auto& topicName = advertisement.
topic;
656 const auto& topicType = advertisement.
schemaName;
660 const auto otherPublishers = get_publishers_info_by_topic(topicName);
661 const auto otherPublisherIt =
662 std::find_if(otherPublishers.begin(), otherPublishers.end(),
663 [
this](
const rclcpp::TopicEndpointInfo& endpoint) {
664 return endpoint.node_name() != this->get_name() ||
665 endpoint.node_namespace() != this->get_namespace();
667 rclcpp::QoS qos = otherPublisherIt == otherPublishers.end() ? rclcpp::SystemDefaultsQoS()
668 : otherPublisherIt->qos_profile();
673 if (qos.history() == rclcpp::HistoryPolicy::Unknown) {
674 qos.history(rclcpp::HistoryPolicy::SystemDefault);
676 rclcpp::PublisherOptions publisherOptions{};
678 auto publisher = this->create_generic_publisher(topicName, topicType, qos, publisherOptions);
680 RCLCPP_INFO(this->get_logger(),
681 "Client %s is advertising \"%s\" (%s) on channel %d with encoding \"%s\"",
682 _server->remoteEndpointString(hdl).c_str(), topicName.c_str(), topicType.c_str(),
686 clientPublications.emplace(advertisement.
channelId, std::move(publisher));
687 }
catch (
const std::exception& ex) {
689 std::string(
"Failed to create publisher: ") + ex.what());
699 channelId,
"Ignoring client unadvertisement from " +
_server->remoteEndpointString(hdl) +
700 " for unknown channel " + std::to_string(channelId) +
701 ", client has no advertised topics");
704 auto& clientPublications = it->second;
705 auto it2 = clientPublications.find(channelId);
706 if (it2 == clientPublications.end()) {
708 channelId,
"Ignoring client unadvertisement from " +
_server->remoteEndpointString(hdl) +
709 " for unknown channel " + std::to_string(channelId) +
", client has " +
710 std::to_string(clientPublications.size()) +
" advertised topic(s)");
713 const auto& publisher = it2->second;
714 RCLCPP_INFO(this->get_logger(),
715 "Client %s is no longer advertising %s (%zu subscribers) on channel %d",
716 _server->remoteEndpointString(hdl).c_str(), publisher->get_topic_name(),
717 publisher->get_subscription_count(), channelId);
719 clientPublications.erase(it2);
720 if (clientPublications.empty()) {
727 this->create_wall_timer(1s, []() {});
732 rclcpp::GenericPublisher::SharedPtr publisher;
740 channelId,
"Dropping client message from " +
_server->remoteEndpointString(hdl) +
741 " for unknown channel " + std::to_string(channelId) +
742 ", client has no advertised topics");
745 auto& clientPublications = it->second;
746 auto it2 = clientPublications.find(channelId);
747 if (it2 == clientPublications.end()) {
749 channelId,
"Dropping client message from " +
_server->remoteEndpointString(hdl) +
750 " for unknown channel " + std::to_string(channelId) +
", client has " +
751 std::to_string(clientPublications.size()) +
" advertised topic(s)");
753 publisher = it2->second;
756 auto publishMessage = [publisher,
this](
const void* data,
size_t size) {
758 rclcpp::SerializedMessage serializedMessage{size};
759 auto& rclSerializedMsg = serializedMessage.get_rcl_serialized_message();
760 std::memcpy(rclSerializedMsg.buffer, data, size);
761 rclSerializedMsg.buffer_length = size;
764 publisher->publish(serializedMessage);
766 publisher->publish_as_loaned_msg(serializedMessage);
774 std::shared_ptr<RosMsgParser::Parser>
parser;
779 parser = parserIt->second;
784 "Dropping client message from " +
785 _server->remoteEndpointString(hdl) +
786 " with encoding \"json\": no parser found");
788 thread_local RosMsgParser::ROS2_Serializer serializer;
790 const std::string jsonMessage(
reinterpret_cast<const char*
>(message.
getData()),
793 parser->serializeFromJson(jsonMessage, &serializer);
794 publishMessage(serializer.getBufferData(), serializer.getBufferSize());
795 }
catch (
const std::exception& ex) {
797 "Dropping client message from " +
798 _server->remoteEndpointString(hdl) +
799 " with encoding \"json\": " + ex.what());
805 "Dropping client message from " +
_server->remoteEndpointString(hdl) +
811 const std::optional<std::string>& requestId,
817 std::vector<std::string> parameterNames(parameters.size());
818 for (
size_t i = 0; i < parameters.size(); ++i) {
819 parameterNames[i] = parameters[i].getName();
826 const std::optional<std::string>& requestId,
828 const auto params =
_paramInterface->getParams(parameters, std::chrono::seconds(5));
829 _server->publishParameterValues(hdl, params, requestId);
843 _server->updateParameterValues(parameters);
848 case LogLevel::Debug:
849 RCLCPP_DEBUG(this->get_logger(),
"[WS] %s", msg);
852 RCLCPP_INFO(this->get_logger(),
"[WS] %s", msg);
855 RCLCPP_WARN(this->get_logger(),
"[WS] %s", msg);
857 case LogLevel::Error:
858 RCLCPP_ERROR(this->get_logger(),
"[WS] %s", msg);
860 case LogLevel::Critical:
861 RCLCPP_FATAL(this->get_logger(),
"[WS] %s", msg);
868 std::shared_ptr<const rclcpp::SerializedMessage> msg) {
871 const auto timestamp = this->now().nanoseconds();
872 assert(timestamp >= 0 &&
"Timestamp is negative");
873 const auto rclSerializedMsg = msg->get_rcl_serialized_message();
874 _server->sendMessage(clientHandle, channelId,
static_cast<uint64_t
>(timestamp),
875 rclSerializedMsg.buffer, rclSerializedMsg.buffer_length);
880 RCLCPP_DEBUG(this->get_logger(),
"Received a request for service %d", request.
serviceId);
887 "Service with id " + std::to_string(request.
serviceId) +
" does not exist");
893 auto clientOptions = rcl_client_get_default_options();
894 auto genClient = GenericClient::make_shared(
895 this->get_node_base_interface().
get(), this->get_node_graph_interface(),
896 serviceIt->second.name, serviceIt->second.type, clientOptions);
899 }
catch (
const std::exception& ex) {
902 "Failed to create service client for service " + serviceIt->second.name +
": " + ex.what());
906 auto client = clientIt->second;
907 if (!client->wait_for_service(1s)) {
909 "Service " + serviceIt->second.name +
" is not available");
912 auto reqMessage = std::make_shared<rclcpp::SerializedMessage>(request.
data.size());
913 auto& rclSerializedMsg = reqMessage->get_rcl_serialized_message();
914 std::memcpy(rclSerializedMsg.buffer, request.
data.data(), request.
data.size());
915 rclSerializedMsg.buffer_length = request.
data.size();
917 auto responseReceivedCallback = [
this, request,
919 const auto serializedResponseMsg = future.get()->get_rcl_serialized_message();
921 std::vector<uint8_t>(serializedResponseMsg.buffer_length)};
922 std::memcpy(
response.data.data(), serializedResponseMsg.buffer,
923 serializedResponseMsg.buffer_length);
924 _server->sendServiceResponse(clientHandle, response);
926 client->async_send_request(reqMessage, responseReceivedCallback);
941 throw std::runtime_error(
"Asset URI not allowed: " + uri);
947 #if RESOURCE_RETRIEVER_VERSION_MAJOR > 3 || \
948 (RESOURCE_RETRIEVER_VERSION_MAJOR == 3 && RESOURCE_RETRIEVER_VERSION_MINOR > 6)
952 response.data.resize(memoryResource->data.size());
953 std::memcpy(
response.data.data(), memoryResource->data.data(), memoryResource->data.size());
959 std::memcpy(
response.data.data(), memoryResource.
data.get(), memoryResource.
size);
961 }
catch (
const std::exception& ex) {
962 RCLCPP_WARN(this->get_logger(),
"Failed to retrieve asset '%s': %s", uri.c_str(), ex.what());
964 response.errorMessage =
"Failed to retrieve asset " + uri;
968 _server->sendFetchAssetResponse(clientHandle, response);
978 #include <rclcpp_components/register_node_macro.hpp>