ros2_foxglove_bridge.cpp
Go to the documentation of this file.
1 #include <unordered_set>
2 
4 
5 namespace foxglove_bridge {
6 namespace {
7 inline bool isHiddenTopicOrService(const std::string& name) {
8  if (name.empty()) {
9  throw std::invalid_argument("Topic or service name can't be empty");
10  }
11  return name.front() == '_' || name.find("/_") != std::string::npos;
12 }
13 } // namespace
14 
15 using namespace std::chrono_literals;
16 using namespace std::placeholders;
18 
19 FoxgloveBridge::FoxgloveBridge(const rclcpp::NodeOptions& options)
20  : Node("foxglove_bridge", options) {
21  const char* rosDistro = std::getenv("ROS_DISTRO");
22  RCLCPP_INFO(this->get_logger(), "Starting foxglove_bridge (%s, %s@%s) with %s", rosDistro,
25 
26  declareParameters(this);
27 
28  const auto port = static_cast<uint16_t>(this->get_parameter(PARAM_PORT).as_int());
29  const auto address = this->get_parameter(PARAM_ADDRESS).as_string();
30  const auto send_buffer_limit =
31  static_cast<size_t>(this->get_parameter(PARAM_SEND_BUFFER_LIMIT).as_int());
32  const auto useTLS = this->get_parameter(PARAM_USETLS).as_bool();
33  const auto certfile = this->get_parameter(PARAM_CERTFILE).as_string();
34  const auto keyfile = this->get_parameter(PARAM_KEYFILE).as_string();
35  _minQosDepth = static_cast<size_t>(this->get_parameter(PARAM_MIN_QOS_DEPTH).as_int());
36  _maxQosDepth = static_cast<size_t>(this->get_parameter(PARAM_MAX_QOS_DEPTH).as_int());
37  const auto topicWhiteList = this->get_parameter(PARAM_TOPIC_WHITELIST).as_string_array();
38  _topicWhitelistPatterns = parseRegexStrings(this, topicWhiteList);
39  const auto serviceWhiteList = this->get_parameter(PARAM_SERVICE_WHITELIST).as_string_array();
40  _serviceWhitelistPatterns = parseRegexStrings(this, serviceWhiteList);
41  const auto paramWhiteList = this->get_parameter(PARAM_PARAMETER_WHITELIST).as_string_array();
42  const auto paramWhitelistPatterns = parseRegexStrings(this, paramWhiteList);
43  const auto useCompression = this->get_parameter(PARAM_USE_COMPRESSION).as_bool();
44  _useSimTime = this->get_parameter("use_sim_time").as_bool();
45  _capabilities = this->get_parameter(PARAM_CAPABILITIES).as_string_array();
46  const auto clientTopicWhiteList =
47  this->get_parameter(PARAM_CLIENT_TOPIC_WHITELIST).as_string_array();
48  const auto clientTopicWhiteListPatterns = parseRegexStrings(this, clientTopicWhiteList);
49  _includeHidden = this->get_parameter(PARAM_INCLUDE_HIDDEN).as_bool();
50 
51  const auto logHandler = std::bind(&FoxgloveBridge::logHandler, this, _1, _2);
52  foxglove::ServerOptions serverOptions;
53  serverOptions.capabilities = _capabilities;
54  if (_useSimTime) {
55  serverOptions.capabilities.push_back(foxglove::CAPABILITY_TIME);
56  }
57  serverOptions.supportedEncodings = {"cdr"};
58  serverOptions.metadata = {{"ROS_DISTRO", rosDistro}};
59  serverOptions.sendBufferLimitBytes = send_buffer_limit;
60  serverOptions.sessionId = std::to_string(std::time(nullptr));
61  serverOptions.useCompression = useCompression;
62  serverOptions.useTls = useTLS;
63  serverOptions.certfile = certfile;
64  serverOptions.keyfile = keyfile;
65  serverOptions.clientTopicWhitelistPatterns = clientTopicWhiteListPatterns;
66 
67  _server = foxglove::ServerFactory::createServer<ConnectionHandle>("foxglove_bridge", logHandler,
68  serverOptions);
69 
71  hdlrs.subscribeHandler = std::bind(&FoxgloveBridge::subscribe, this, _1, _2);
72  hdlrs.unsubscribeHandler = std::bind(&FoxgloveBridge::unsubscribe, this, _1, _2);
73  hdlrs.clientAdvertiseHandler = std::bind(&FoxgloveBridge::clientAdvertise, this, _1, _2);
74  hdlrs.clientUnadvertiseHandler = std::bind(&FoxgloveBridge::clientUnadvertise, this, _1, _2);
75  hdlrs.clientMessageHandler = std::bind(&FoxgloveBridge::clientMessage, this, _1, _2);
76  hdlrs.serviceRequestHandler = std::bind(&FoxgloveBridge::serviceRequest, this, _1, _2);
77  hdlrs.subscribeConnectionGraphHandler =
78  std::bind(&FoxgloveBridge::subscribeConnectionGraph, this, _1);
79 
82  hdlrs.parameterRequestHandler = std::bind(&FoxgloveBridge::getParameters, this, _1, _2, _3);
83  hdlrs.parameterChangeHandler = std::bind(&FoxgloveBridge::setParameters, this, _1, _2, _3);
84  hdlrs.parameterSubscriptionHandler =
85  std::bind(&FoxgloveBridge::subscribeParameters, this, _1, _2, _3);
86 
87  _paramInterface = std::make_shared<ParameterInterface>(this, paramWhitelistPatterns);
88  _paramInterface->setParamUpdateCallback(std::bind(&FoxgloveBridge::parameterUpdates, this, _1));
89  }
90 
91  _server->setHandlers(std::move(hdlrs));
92  _server->start(address, port);
93 
94  // Get the actual port we bound to
95  uint16_t listeningPort = _server->getPort();
96  if (port != listeningPort) {
97  RCLCPP_DEBUG(this->get_logger(), "Reassigning \"port\" parameter from %d to %d", port,
98  listeningPort);
99  this->set_parameter(rclcpp::Parameter{PARAM_PORT, listeningPort});
100  }
101 
102  // Start the thread polling for rosgraph changes
104  std::make_unique<std::thread>(std::bind(&FoxgloveBridge::rosgraphPollThread, this));
105 
106  _subscriptionCallbackGroup = this->create_callback_group(rclcpp::CallbackGroupType::Reentrant);
108  this->create_callback_group(rclcpp::CallbackGroupType::MutuallyExclusive);
109  _servicesCallbackGroup = this->create_callback_group(rclcpp::CallbackGroupType::Reentrant);
110 
111  if (_useSimTime) {
112  _clockSubscription = this->create_subscription<rosgraph_msgs::msg::Clock>(
113  "/clock", rclcpp::QoS{rclcpp::KeepLast(1)}.best_effort(),
114  [&](std::shared_ptr<rosgraph_msgs::msg::Clock> msg) {
115  const auto timestamp = rclcpp::Time{msg->clock}.nanoseconds();
116  assert(timestamp >= 0 && "Timestamp is negative");
117  _server->broadcastTime(static_cast<uint64_t>(timestamp));
118  });
119  }
120 }
121 
123  RCLCPP_INFO(this->get_logger(), "Shutting down %s", this->get_name());
124  if (_rosgraphPollThread) {
125  _rosgraphPollThread->join();
126  }
127  _server->stop();
128  RCLCPP_INFO(this->get_logger(), "Shutdown complete");
129 }
130 
132  updateAdvertisedTopics(get_topic_names_and_types());
134 
135  auto graphEvent = this->get_graph_event();
136  while (rclcpp::ok()) {
137  try {
138  this->wait_for_graph_change(graphEvent, 200ms);
139  bool triggered = graphEvent->check_and_clear();
140  if (triggered) {
141  RCLCPP_DEBUG(this->get_logger(), "rosgraph change detected");
142  const auto topicNamesAndTypes = get_topic_names_and_types();
143  updateAdvertisedTopics(topicNamesAndTypes);
146  updateConnectionGraph(topicNamesAndTypes);
147  }
148  // Graph changes tend to come in batches, so wait a bit before checking again
149  std::this_thread::sleep_for(500ms);
150  }
151  } catch (const std::exception& ex) {
152  RCLCPP_ERROR(this->get_logger(), "Exception thrown in rosgraphPollThread: %s", ex.what());
153  }
154  }
155 
156  RCLCPP_DEBUG(this->get_logger(), "rosgraph polling thread exiting");
157 }
158 
160  const std::map<std::string, std::vector<std::string>>& topicNamesAndTypes) {
161  if (!rclcpp::ok()) {
162  return;
163  }
164 
165  std::unordered_set<TopicAndDatatype, PairHash> latestTopics;
166  latestTopics.reserve(topicNamesAndTypes.size());
167  for (const auto& topicNamesAndType : topicNamesAndTypes) {
168  const auto& topicName = topicNamesAndType.first;
169  const auto& datatypes = topicNamesAndType.second;
170 
171  // Ignore hidden topics if not explicitly included
172  if (!_includeHidden && isHiddenTopicOrService(topicName)) {
173  continue;
174  }
175 
176  // Ignore the topic if it is not on the topic whitelist
177  if (isWhitelisted(topicName, _topicWhitelistPatterns)) {
178  for (const auto& datatype : datatypes) {
179  latestTopics.emplace(topicName, datatype);
180  }
181  }
182  }
183 
184  if (const auto numIgnoredTopics = topicNamesAndTypes.size() - latestTopics.size()) {
185  RCLCPP_DEBUG(
186  this->get_logger(),
187  "%zu topics have been ignored as they do not match any pattern on the topic whitelist",
188  numIgnoredTopics);
189  }
190 
191  std::lock_guard<std::mutex> lock(_subscriptionsMutex);
192 
193  // Remove channels for which the topic does not exist anymore
194  std::vector<foxglove::ChannelId> channelIdsToRemove;
195  for (auto channelIt = _advertisedTopics.begin(); channelIt != _advertisedTopics.end();) {
196  const TopicAndDatatype topicAndDatatype = {channelIt->second.topic,
197  channelIt->second.schemaName};
198  if (latestTopics.find(topicAndDatatype) == latestTopics.end()) {
199  const auto channelId = channelIt->first;
200  channelIdsToRemove.push_back(channelId);
201  _subscriptions.erase(channelId);
202  RCLCPP_INFO(this->get_logger(), "Removed channel %d for topic \"%s\" (%s)", channelId,
203  topicAndDatatype.first.c_str(), topicAndDatatype.second.c_str());
204  channelIt = _advertisedTopics.erase(channelIt);
205  } else {
206  channelIt++;
207  }
208  }
209  _server->removeChannels(channelIdsToRemove);
210 
211  // Add new channels for new topics
212  std::vector<foxglove::ChannelWithoutId> channelsToAdd;
213  for (const auto& topicAndDatatype : latestTopics) {
214  if (std::find_if(_advertisedTopics.begin(), _advertisedTopics.end(),
215  [topicAndDatatype](const auto& channelIdAndChannel) {
216  const auto& channel = channelIdAndChannel.second;
217  return channel.topic == topicAndDatatype.first &&
218  channel.schemaName == topicAndDatatype.second;
219  }) != _advertisedTopics.end()) {
220  continue; // Topic already advertised
221  }
222 
223  foxglove::ChannelWithoutId newChannel{};
224  newChannel.topic = topicAndDatatype.first;
225  newChannel.schemaName = topicAndDatatype.second;
226 
227  try {
228  auto [format, schema] = _messageDefinitionCache.get_full_text(topicAndDatatype.second);
229  switch (format) {
231  newChannel.encoding = "cdr";
232  newChannel.schema = schema;
233  newChannel.schemaEncoding = "ros2msg";
234  break;
236  newChannel.encoding = "cdr";
237  newChannel.schema = schema;
238  newChannel.schemaEncoding = "ros2idl";
239  break;
240  }
241 
242  } catch (const foxglove::DefinitionNotFoundError& err) {
243  RCLCPP_WARN(this->get_logger(), "Could not find definition for type %s: %s",
244  topicAndDatatype.second.c_str(), err.what());
245  // We still advertise the channel, but with an emtpy schema
246  newChannel.schema = "";
247  } catch (const std::exception& err) {
248  RCLCPP_WARN(this->get_logger(), "Failed to add channel for topic \"%s\" (%s): %s",
249  topicAndDatatype.first.c_str(), topicAndDatatype.second.c_str(), err.what());
250  continue;
251  }
252 
253  channelsToAdd.push_back(newChannel);
254  }
255 
256  const auto channelIds = _server->addChannels(channelsToAdd);
257  for (size_t i = 0; i < channelsToAdd.size(); ++i) {
258  const auto channelId = channelIds[i];
259  const auto& channel = channelsToAdd[i];
260  _advertisedTopics.emplace(channelId, channel);
261  RCLCPP_DEBUG(this->get_logger(), "Advertising channel %d for topic \"%s\" (%s)", channelId,
262  channel.topic.c_str(), channel.schemaName.c_str());
263  }
264 }
265 
267  if (!rclcpp::ok()) {
268  return;
270  return;
271  }
272 
273  // Get the current list of visible services and datatypes from the ROS graph
274  const auto serviceNamesAndTypes = this->get_node_graph_interface()->get_service_names_and_types();
275 
276  std::lock_guard<std::mutex> lock(_servicesMutex);
277 
278  // Remove advertisements for services that have been removed
279  std::vector<foxglove::ServiceId> servicesToRemove;
280  for (const auto& service : _advertisedServices) {
281  const auto it = std::find_if(serviceNamesAndTypes.begin(), serviceNamesAndTypes.end(),
282  [service](const auto& serviceNameAndTypes) {
283  return serviceNameAndTypes.first == service.second.name;
284  });
285  if (it == serviceNamesAndTypes.end()) {
286  servicesToRemove.push_back(service.first);
287  }
288  }
289  for (auto serviceId : servicesToRemove) {
290  _advertisedServices.erase(serviceId);
291  }
292  _server->removeServices(servicesToRemove);
293 
294  // Advertise new services
295  std::vector<foxglove::ServiceWithoutId> newServices;
296  for (const auto& serviceNamesAndType : serviceNamesAndTypes) {
297  const auto& serviceName = serviceNamesAndType.first;
298  const auto& datatypes = serviceNamesAndType.second;
299 
300  // Ignore the service if it's already advertised
301  if (std::find_if(_advertisedServices.begin(), _advertisedServices.end(),
302  [serviceName](const auto& idWithService) {
303  return idWithService.second.name == serviceName;
304  }) != _advertisedServices.end()) {
305  continue;
306  }
307 
308  // Ignore hidden services if not explicitly included
309  if (!_includeHidden && isHiddenTopicOrService(serviceName)) {
310  continue;
311  }
312 
313  // Ignore the service if it is not on the service whitelist
314  if (!isWhitelisted(serviceName, _serviceWhitelistPatterns)) {
315  continue;
316  }
317 
319  service.name = serviceName;
320  service.type = datatypes.front();
321 
322  try {
323  const auto requestTypeName = service.type + foxglove::SERVICE_REQUEST_MESSAGE_SUFFIX;
324  const auto responseTypeName = service.type + foxglove::SERVICE_RESPONSE_MESSAGE_SUFFIX;
325  const auto [format, reqSchema] = _messageDefinitionCache.get_full_text(requestTypeName);
326  const auto resSchema = _messageDefinitionCache.get_full_text(responseTypeName).second;
327  switch (format) {
329  service.requestSchema = reqSchema;
330  service.responseSchema = resSchema;
331  break;
333  RCLCPP_WARN(this->get_logger(),
334  "IDL message definition format cannot be communicated over ws-protocol. "
335  "Service \"%s\" (%s) may not decode correctly in clients",
336  service.name.c_str(), service.type.c_str());
337  service.requestSchema = reqSchema;
338  service.responseSchema = resSchema;
339  break;
340  }
341  } catch (const foxglove::DefinitionNotFoundError& err) {
342  RCLCPP_WARN(this->get_logger(), "Could not find definition for type %s: %s",
343  service.type.c_str(), err.what());
344  // We still advertise the service, but with an emtpy schema
345  service.requestSchema = "";
346  service.responseSchema = "";
347  } catch (const std::exception& err) {
348  RCLCPP_WARN(this->get_logger(), "Failed to add service \"%s\" (%s): %s", service.name.c_str(),
349  service.type.c_str(), err.what());
350  continue;
351  }
352 
353  newServices.push_back(service);
354  }
355 
356  const auto serviceIds = _server->addServices(newServices);
357  for (size_t i = 0; i < serviceIds.size(); ++i) {
358  _advertisedServices.emplace(serviceIds[i], newServices[i]);
359  }
360 }
361 
363  const std::map<std::string, std::vector<std::string>>& topicNamesAndTypes) {
364  foxglove::MapOfSets publishers, subscribers;
365 
366  for (const auto& topicNameAndType : topicNamesAndTypes) {
367  const auto& topicName = topicNameAndType.first;
368  if (!isWhitelisted(topicName, _topicWhitelistPatterns)) {
369  continue;
370  }
371 
372  const auto publishersInfo = get_publishers_info_by_topic(topicName);
373  const auto subscribersInfo = get_subscriptions_info_by_topic(topicName);
374  std::unordered_set<std::string> publisherIds, subscriberIds;
375  for (const auto& publisher : publishersInfo) {
376  const auto& ns = publisher.node_namespace();
377  const auto sep = (!ns.empty() && ns.back() == '/') ? "" : "/";
378  publisherIds.insert(ns + sep + publisher.node_name());
379  }
380  for (const auto& subscriber : subscribersInfo) {
381  const auto& ns = subscriber.node_namespace();
382  const auto sep = (!ns.empty() && ns.back() == '/') ? "" : "/";
383  subscriberIds.insert(ns + sep + subscriber.node_name());
384  }
385  publishers.emplace(topicName, publisherIds);
386  subscribers.emplace(topicName, subscriberIds);
387  }
388 
389  foxglove::MapOfSets services;
390  for (const auto& fqnNodeName : get_node_names()) {
391  const auto [nodeNs, nodeName] = getNodeAndNodeNamespace(fqnNodeName);
392  const auto serviceNamesAndTypes = get_service_names_and_types_by_node(nodeName, nodeNs);
393 
394  for (const auto& [serviceName, serviceTypes] : serviceNamesAndTypes) {
395  (void)serviceTypes;
396  if (isWhitelisted(serviceName, _serviceWhitelistPatterns)) {
397  services[serviceName].insert(fqnNodeName);
398  }
399  }
400  }
401 
402  _server->updateConnectionGraph(publishers, subscribers, services);
403 }
404 
406  if ((_subscribeGraphUpdates = subscribe)) {
407  updateConnectionGraph(get_topic_names_and_types());
408  };
409 }
410 
411 void FoxgloveBridge::subscribe(foxglove::ChannelId channelId, ConnectionHandle clientHandle) {
412  std::lock_guard<std::mutex> lock(_subscriptionsMutex);
413  auto it = _advertisedTopics.find(channelId);
414  if (it == _advertisedTopics.end()) {
416  channelId, "Received subscribe request for unknown channel " + std::to_string(channelId));
417  }
418 
419  const auto& channel = it->second;
420  const auto& topic = channel.topic;
421  const auto& datatype = channel.schemaName;
422 
423  // Get client subscriptions for this channel or insert an empty map.
424  auto [subscriptionsIt, firstSubscription] =
425  _subscriptions.emplace(channelId, SubscriptionsByClient());
426  auto& subscriptionsByClient = subscriptionsIt->second;
427 
428  if (!firstSubscription &&
429  subscriptionsByClient.find(clientHandle) != subscriptionsByClient.end()) {
431  channelId, "Client is already subscribed to channel " + std::to_string(channelId));
432  }
433 
434  rclcpp::SubscriptionEventCallbacks eventCallbacks;
435  eventCallbacks.incompatible_qos_callback = [&](const rclcpp::QOSRequestedIncompatibleQoSInfo&) {
436  RCLCPP_ERROR(this->get_logger(), "Incompatible subscriber QoS settings for topic \"%s\" (%s)",
437  topic.c_str(), datatype.c_str());
438  };
439 
440  rclcpp::SubscriptionOptions subscriptionOptions;
441  subscriptionOptions.event_callbacks = eventCallbacks;
442  subscriptionOptions.callback_group = _subscriptionCallbackGroup;
443 
444  // Select an appropriate subscription QOS profile. This is similar to how ros2 topic echo
445  // does it:
446  // https://github.com/ros2/ros2cli/blob/619b3d1c9/ros2topic/ros2topic/verb/echo.py#L137-L194
447  size_t depth = 0;
448  size_t reliabilityReliableEndpointsCount = 0;
449  size_t durabilityTransientLocalEndpointsCount = 0;
450 
451  const auto publisherInfo = this->get_publishers_info_by_topic(topic);
452  for (const auto& publisher : publisherInfo) {
453  const auto& qos = publisher.qos_profile();
454  if (qos.reliability() == rclcpp::ReliabilityPolicy::Reliable) {
455  ++reliabilityReliableEndpointsCount;
456  }
457  if (qos.durability() == rclcpp::DurabilityPolicy::TransientLocal) {
458  ++durabilityTransientLocalEndpointsCount;
459  }
460  depth = std::min(_maxQosDepth, depth + qos.depth());
461  }
462 
463  rclcpp::QoS qos{rclcpp::KeepLast(std::max(depth, _minQosDepth))};
464 
465  // If all endpoints are reliable, ask for reliable
466  if (reliabilityReliableEndpointsCount == publisherInfo.size()) {
467  qos.reliable();
468  } else {
469  if (reliabilityReliableEndpointsCount > 0) {
470  RCLCPP_WARN(
471  this->get_logger(),
472  "Some, but not all, publishers on topic '%s' are offering QoSReliabilityPolicy.RELIABLE. "
473  "Falling back to QoSReliabilityPolicy.BEST_EFFORT as it will connect to all publishers",
474  topic.c_str());
475  }
476  qos.best_effort();
477  }
478 
479  // If all endpoints are transient_local, ask for transient_local
480  if (durabilityTransientLocalEndpointsCount == publisherInfo.size()) {
481  qos.transient_local();
482  } else {
483  if (durabilityTransientLocalEndpointsCount > 0) {
484  RCLCPP_WARN(this->get_logger(),
485  "Some, but not all, publishers on topic '%s' are offering "
486  "QoSDurabilityPolicy.TRANSIENT_LOCAL. Falling back to "
487  "QoSDurabilityPolicy.VOLATILE as it will connect to all publishers",
488  topic.c_str());
489  }
490  qos.durability_volatile();
491  }
492 
493  if (firstSubscription) {
494  RCLCPP_INFO(this->get_logger(), "Subscribing to topic \"%s\" (%s) on channel %d", topic.c_str(),
495  datatype.c_str(), channelId);
496 
497  } else {
498  RCLCPP_INFO(this->get_logger(), "Adding subscriber #%zu to topic \"%s\" (%s) on channel %d",
499  subscriptionsByClient.size(), topic.c_str(), datatype.c_str(), channelId);
500  }
501 
502  try {
503  auto subscriber = this->create_generic_subscription(
504  topic, datatype, qos,
505  std::bind(&FoxgloveBridge::rosMessageHandler, this, channelId, clientHandle, _1),
506  subscriptionOptions);
507  subscriptionsByClient.emplace(clientHandle, std::move(subscriber));
508  } catch (const std::exception& ex) {
510  channelId, "Failed to subscribe to topic " + topic + " (" + datatype + "): " + ex.what());
511  }
512 }
513 
515  std::lock_guard<std::mutex> lock(_subscriptionsMutex);
516 
517  const auto channelIt = _advertisedTopics.find(channelId);
518  if (channelIt == _advertisedTopics.end()) {
520  channelId, "Received unsubscribe request for unknown channel " + std::to_string(channelId));
521  }
522  const auto& channel = channelIt->second;
523 
524  auto subscriptionsIt = _subscriptions.find(channelId);
525  if (subscriptionsIt == _subscriptions.end()) {
526  throw foxglove::ChannelError(channelId, "Received unsubscribe request for channel " +
527  std::to_string(channelId) +
528  " that was not subscribed to");
529  }
530 
531  auto& subscriptionsByClient = subscriptionsIt->second;
532  const auto clientSubscription = subscriptionsByClient.find(clientHandle);
533  if (clientSubscription == subscriptionsByClient.end()) {
535  channelId, "Received unsubscribe request for channel " + std::to_string(channelId) +
536  "from a client that was not subscribed to this channel");
537  }
538 
539  subscriptionsByClient.erase(clientSubscription);
540  if (subscriptionsByClient.empty()) {
541  RCLCPP_INFO(this->get_logger(), "Unsubscribing from topic \"%s\" (%s) on channel %d",
542  channel.topic.c_str(), channel.schemaName.c_str(), channelId);
543  _subscriptions.erase(subscriptionsIt);
544  } else {
545  RCLCPP_INFO(this->get_logger(),
546  "Removed one subscription from channel %d (%zu subscription(s) left)", channelId,
547  subscriptionsByClient.size());
548  }
549 }
550 
552  ConnectionHandle hdl) {
553  std::lock_guard<std::mutex> lock(_clientAdvertisementsMutex);
554 
555  // Get client publications or insert an empty map.
556  auto [clientPublicationsIt, isFirstPublication] =
558 
559  auto& clientPublications = clientPublicationsIt->second;
560 
561  if (!isFirstPublication &&
562  clientPublications.find(advertisement.channelId) != clientPublications.end()) {
564  advertisement.channelId,
565  "Received client advertisement from " + _server->remoteEndpointString(hdl) + " for channel " +
566  std::to_string(advertisement.channelId) + " it had already advertised");
567  }
568 
569  try {
570  // Create a new topic advertisement
571  const auto& topicName = advertisement.topic;
572  const auto& topicType = advertisement.schemaName;
573 
574  // Lookup if there are publishers from other nodes for that topic. If that's the case, we use
575  // a matching QoS profile.
576  const auto otherPublishers = get_publishers_info_by_topic(topicName);
577  const auto otherPublisherIt =
578  std::find_if(otherPublishers.begin(), otherPublishers.end(),
579  [this](const rclcpp::TopicEndpointInfo& endpoint) {
580  return endpoint.node_name() != this->get_name() ||
581  endpoint.node_namespace() != this->get_namespace();
582  });
583  rclcpp::QoS qos = otherPublisherIt == otherPublishers.end() ? rclcpp::SystemDefaultsQoS()
584  : otherPublisherIt->qos_profile();
585 
586  // When the QoS profile is copied from another existing publisher, it can happen that the
587  // history policy is Unknown, leading to an error when subsequently trying to create a publisher
588  // with that QoS profile. As a fix, we explicitly set the history policy to the system default.
589  if (qos.history() == rclcpp::HistoryPolicy::Unknown) {
590  qos.history(rclcpp::HistoryPolicy::SystemDefault);
591  }
592  rclcpp::PublisherOptions publisherOptions{};
593  publisherOptions.callback_group = _clientPublishCallbackGroup;
594  auto publisher = this->create_generic_publisher(topicName, topicType, qos, publisherOptions);
595 
596  RCLCPP_INFO(this->get_logger(), "Client %s is advertising \"%s\" (%s) on channel %d",
597  _server->remoteEndpointString(hdl).c_str(), topicName.c_str(), topicType.c_str(),
598  advertisement.channelId);
599 
600  // Store the new topic advertisement
601  clientPublications.emplace(advertisement.channelId, std::move(publisher));
602  } catch (const std::exception& ex) {
603  throw foxglove::ClientChannelError(advertisement.channelId,
604  std::string("Failed to create publisher: ") + ex.what());
605  }
606 }
607 
609  std::lock_guard<std::mutex> lock(_clientAdvertisementsMutex);
610 
611  auto it = _clientAdvertisedTopics.find(hdl);
612  if (it == _clientAdvertisedTopics.end()) {
614  channelId, "Ignoring client unadvertisement from " + _server->remoteEndpointString(hdl) +
615  " for unknown channel " + std::to_string(channelId) +
616  ", client has no advertised topics");
617  }
618 
619  auto& clientPublications = it->second;
620  auto it2 = clientPublications.find(channelId);
621  if (it2 == clientPublications.end()) {
623  channelId, "Ignoring client unadvertisement from " + _server->remoteEndpointString(hdl) +
624  " for unknown channel " + std::to_string(channelId) + ", client has " +
625  std::to_string(clientPublications.size()) + " advertised topic(s)");
626  }
627 
628  const auto& publisher = it2->second;
629  RCLCPP_INFO(this->get_logger(),
630  "Client %s is no longer advertising %s (%zu subscribers) on channel %d",
631  _server->remoteEndpointString(hdl).c_str(), publisher->get_topic_name(),
632  publisher->get_subscription_count(), channelId);
633 
634  clientPublications.erase(it2);
635  if (clientPublications.empty()) {
636  _clientAdvertisedTopics.erase(it);
637  }
638 
639  // Create a timer that immedeately goes out of scope (so it never fires) which will trigger
640  // the previously destroyed publisher to be cleaned up. This is a workaround for
641  // https://github.com/ros2/rclcpp/issues/2146
642  this->create_wall_timer(1s, []() {});
643 }
644 
646  // Get the publisher
647  rclcpp::GenericPublisher::SharedPtr publisher;
648  {
649  const auto channelId = message.advertisement.channelId;
650  std::lock_guard<std::mutex> lock(_clientAdvertisementsMutex);
651 
652  auto it = _clientAdvertisedTopics.find(hdl);
653  if (it == _clientAdvertisedTopics.end()) {
655  channelId, "Dropping client message from " + _server->remoteEndpointString(hdl) +
656  " for unknown channel " + std::to_string(channelId) +
657  ", client has no advertised topics");
658  }
659 
660  auto& clientPublications = it->second;
661  auto it2 = clientPublications.find(channelId);
662  if (it2 == clientPublications.end()) {
664  channelId, "Dropping client message from " + _server->remoteEndpointString(hdl) +
665  " for unknown channel " + std::to_string(channelId) + ", client has " +
666  std::to_string(clientPublications.size()) + " advertised topic(s)");
667  }
668  publisher = it2->second;
669  }
670 
671  // Copy the message payload into a SerializedMessage object
672  rclcpp::SerializedMessage serializedMessage{message.getLength()};
673  auto& rclSerializedMsg = serializedMessage.get_rcl_serialized_message();
674  std::memcpy(rclSerializedMsg.buffer, message.getData(), message.getLength());
675  rclSerializedMsg.buffer_length = message.getLength();
676 
677  // Publish the message
678  publisher->publish(serializedMessage);
679 }
680 
681 void FoxgloveBridge::setParameters(const std::vector<foxglove::Parameter>& parameters,
682  const std::optional<std::string>& requestId,
683  ConnectionHandle hdl) {
684  _paramInterface->setParams(parameters, std::chrono::seconds(5));
685 
686  // If a request Id was given, send potentially updated parameters back to client
687  if (requestId) {
688  std::vector<std::string> parameterNames(parameters.size());
689  for (size_t i = 0; i < parameters.size(); ++i) {
690  parameterNames[i] = parameters[i].getName();
691  }
692  getParameters(parameterNames, requestId, hdl);
693  }
694 }
695 
696 void FoxgloveBridge::getParameters(const std::vector<std::string>& parameters,
697  const std::optional<std::string>& requestId,
698  ConnectionHandle hdl) {
699  const auto params = _paramInterface->getParams(parameters, std::chrono::seconds(5));
700  _server->publishParameterValues(hdl, params, requestId);
701 }
702 
703 void FoxgloveBridge::subscribeParameters(const std::vector<std::string>& parameters,
707  _paramInterface->subscribeParams(parameters);
708  } else {
709  _paramInterface->unsubscribeParams(parameters);
710  }
711 }
712 
713 void FoxgloveBridge::parameterUpdates(const std::vector<foxglove::Parameter>& parameters) {
714  _server->updateParameterValues(parameters);
715 }
716 
717 void FoxgloveBridge::logHandler(LogLevel level, char const* msg) {
718  switch (level) {
719  case LogLevel::Debug:
720  RCLCPP_DEBUG(this->get_logger(), "[WS] %s", msg);
721  break;
722  case LogLevel::Info:
723  RCLCPP_INFO(this->get_logger(), "[WS] %s", msg);
724  break;
725  case LogLevel::Warn:
726  RCLCPP_WARN(this->get_logger(), "[WS] %s", msg);
727  break;
728  case LogLevel::Error:
729  RCLCPP_ERROR(this->get_logger(), "[WS] %s", msg);
730  break;
731  case LogLevel::Critical:
732  RCLCPP_FATAL(this->get_logger(), "[WS] %s", msg);
733  break;
734  }
735 }
736 
738  ConnectionHandle clientHandle,
739  std::shared_ptr<rclcpp::SerializedMessage> msg) {
740  // NOTE: Do not call any RCLCPP_* logging functions from this function. Otherwise, subscribing
741  // to `/rosout` will cause a feedback loop
742  const auto timestamp = this->now().nanoseconds();
743  assert(timestamp >= 0 && "Timestamp is negative");
744  const auto rclSerializedMsg = msg->get_rcl_serialized_message();
745  _server->sendMessage(clientHandle, channelId, static_cast<uint64_t>(timestamp),
746  rclSerializedMsg.buffer, rclSerializedMsg.buffer_length);
747 }
748 
750  ConnectionHandle clientHandle) {
751  RCLCPP_DEBUG(this->get_logger(), "Received a request for service %d", request.serviceId);
752 
753  std::lock_guard<std::mutex> lock(_servicesMutex);
754  const auto serviceIt = _advertisedServices.find(request.serviceId);
755  if (serviceIt == _advertisedServices.end()) {
757  request.serviceId,
758  "Service with id " + std::to_string(request.serviceId) + " does not exist");
759  }
760 
761  auto clientIt = _serviceClients.find(request.serviceId);
762  if (clientIt == _serviceClients.end()) {
763  try {
764  auto clientOptions = rcl_client_get_default_options();
765  auto genClient = GenericClient::make_shared(
766  this->get_node_base_interface().get(), this->get_node_graph_interface(),
767  serviceIt->second.name, serviceIt->second.type, clientOptions);
768  clientIt = _serviceClients.emplace(request.serviceId, std::move(genClient)).first;
769  this->get_node_services_interface()->add_client(clientIt->second, _servicesCallbackGroup);
770  } catch (const std::exception& ex) {
772  request.serviceId,
773  "Failed to create service client for service " + serviceIt->second.name + ": " + ex.what());
774  }
775  }
776 
777  auto client = clientIt->second;
778  if (!client->wait_for_service(1s)) {
779  throw foxglove::ServiceError(request.serviceId,
780  "Service " + serviceIt->second.name + " is not available");
781  }
782 
783  auto reqMessage = std::make_shared<rclcpp::SerializedMessage>(request.data.size());
784  auto& rclSerializedMsg = reqMessage->get_rcl_serialized_message();
785  std::memcpy(rclSerializedMsg.buffer, request.data.data(), request.data.size());
786  rclSerializedMsg.buffer_length = request.data.size();
787 
788  auto responseReceivedCallback = [this, request,
789  clientHandle](GenericClient::SharedFuture future) {
790  const auto serializedResponseMsg = future.get()->get_rcl_serialized_message();
791  foxglove::ServiceRequest response{request.serviceId, request.callId, request.encoding,
792  std::vector<uint8_t>(serializedResponseMsg.buffer_length)};
793  std::memcpy(response.data.data(), serializedResponseMsg.buffer,
794  serializedResponseMsg.buffer_length);
795  _server->sendServiceResponse(clientHandle, response);
796  };
797  client->async_send_request(reqMessage, responseReceivedCallback);
798 }
799 
800 bool FoxgloveBridge::hasCapability(const std::string& capability) {
801  return std::find(_capabilities.begin(), _capabilities.end(), capability) != _capabilities.end();
802 }
803 
804 } // namespace foxglove_bridge
805 
806 #include <rclcpp_components/register_node_macro.hpp>
807 
808 // Register the component with class_loader.
809 // This acts as a sort of entry point, allowing the component to be discoverable when its library
810 // is being loaded into a running process.
811 RCLCPP_COMPONENTS_REGISTER_NODE(foxglove_bridge::FoxgloveBridge)
const char FOXGLOVE_BRIDGE_GIT_HASH[]
ClientAdvertisement advertisement
Definition: common.hpp:87
std::unique_ptr< std::thread > _rosgraphPollThread
constexpr char SERVICE_RESPONSE_MESSAGE_SUFFIX[]
std::unordered_map< foxglove::ServiceId, foxglove::ServiceWithoutId > _advertisedServices
std::pair< std::string, std::string > getNodeAndNodeNamespace(const std::string &fqnNodeName)
Definition: utils.hpp:12
foxglove::MessageDefinitionCache _messageDefinitionCache
std::shared_future< SharedResponse > SharedFuture
std::vector< std::regex > _serviceWhitelistPatterns
XmlRpcServer s
std::vector< std::regex > clientTopicWhitelistPatterns
void getParameters(const std::vector< std::string > &parameters, const std::optional< std::string > &requestId, ConnectionHandle hdl)
void unsubscribe(foxglove::ChannelId channelId, ConnectionHandle clientHandle)
bool isWhitelisted(const std::string &name, const std::vector< std::regex > &regexPatterns)
Definition: regex_utils.hpp:10
void subscribe(foxglove::ChannelId channelId, ConnectionHandle clientHandle)
std::string responseSchema
Definition: common.hpp:116
uint32_t ChannelId
Definition: common.hpp:25
bool hasCapability(const std::string &capability)
std::pair< std::string, std::string > TopicAndDatatype
void subscribeParameters(const std::vector< std::string > &parameters, foxglove::ParameterSubscriptionOperation op, ConnectionHandle)
const char FOXGLOVE_BRIDGE_VERSION[]
std::vector< uint8_t > data
Definition: common.hpp:132
std::unordered_map< std::string, std::string > metadata
rclcpp::CallbackGroup::SharedPtr _clientPublishCallbackGroup
std::function< void(ChannelId, ConnectionHandle)> subscribeHandler
rclcpp::CallbackGroup::SharedPtr _subscriptionCallbackGroup
void setParameters(const std::vector< foxglove::Parameter > &parameters, const std::optional< std::string > &requestId, ConnectionHandle hdl)
std::unordered_map< foxglove::ChannelId, SubscriptionsByClient > _subscriptions
std::unordered_map< std::string, std::unordered_set< std::string > > MapOfSets
constexpr char CAPABILITY_PARAMETERS_SUBSCRIBE[]
Definition: common.hpp:16
const char * datatype()
std::size_t getLength() const
Definition: common.hpp:107
void subscribeConnectionGraph(bool subscribe)
void logHandler(foxglove::WebSocketLogLevel level, char const *msg)
void clientMessage(const foxglove::ClientMessage &clientMsg, ConnectionHandle clientHandle)
void updateConnectionGraph(const std::map< std::string, std::vector< std::string >> &topicNamesAndTypes)
constexpr char CAPABILITY_TIME[]
Definition: common.hpp:14
const char * what() const noexcept override
std::map< ConnectionHandle, ros::Subscriber, std::owner_less<> > SubscriptionsByClient
constexpr char SERVICE_REQUEST_MESSAGE_SUFFIX[]
void parameterUpdates(XmlRpc::XmlRpcValue &params, XmlRpc::XmlRpcValue &result)
rclcpp::CallbackGroup::SharedPtr _servicesCallbackGroup
const char * WebSocketUserAgent()
std::pair< MessageDefinitionFormat, std::string > get_full_text(const std::string &package_resource_name)
constexpr char CAPABILITY_SERVICES[]
Definition: common.hpp:17
std::vector< std::string > capabilities
ParameterSubscriptionOperation
Definition: parameter.hpp:11
void declareParameters(rclcpp::Node *node)
void serviceRequest(const foxglove::ServiceRequest &request, ConnectionHandle clientHandle)
websocketpp::connection_hdl ConnectionHandle
std::vector< std::regex > parseRegexStrings(rclcpp::Node *node, const std::vector< std::string > &strings)
constexpr char CAPABILITY_PARAMETERS[]
Definition: common.hpp:15
std::string requestSchema
Definition: common.hpp:115
const uint8_t * getData() const
Definition: common.hpp:104
std::unordered_map< foxglove::ChannelId, foxglove::ChannelWithoutId > _advertisedTopics
std::unordered_map< foxglove::ClientChannelId, ros::Publisher > ClientPublications
ClientChannelId channelId
Definition: common.hpp:76
std::unique_ptr< foxglove::ServerInterface< ConnectionHandle > > _server
void rosMessageHandler(const foxglove::ChannelId channelId, ConnectionHandle clientHandle, const ros::MessageEvent< ros_babel_fish::BabelFishMessage const > &msgEvent)
void clientUnadvertise(foxglove::ClientChannelId channelId, ConnectionHandle clientHandle)
std::shared_ptr< ParameterInterface > _paramInterface
void clientAdvertise(const foxglove::ClientAdvertisement &channel, ConnectionHandle clientHandle)
WebSocketLogLevel
Definition: common.hpp:41
std::unordered_map< foxglove::ServiceId, GenericClient::SharedPtr > _serviceClients
Definition: format.py:1
const std::string response
std::vector< std::string > supportedEncodings


foxglove_bridge
Author(s): Foxglove
autogenerated on Mon Jul 3 2023 02:12:22