ros1_foxglove_bridge_nodelet.cpp
Go to the documentation of this file.
1 #include <atomic>
2 #include <functional>
3 #include <memory>
4 #include <mutex>
5 #include <regex>
6 #include <shared_mutex>
7 #include <string>
8 #include <unordered_set>
9 
10 #include <nodelet/nodelet.h>
12 #include <ros/message_event.h>
13 #include <ros/ros.h>
14 #include <ros/xmlrpc_manager.h>
15 #include <ros_babel_fish/babel_fish_message.h>
16 #include <ros_babel_fish/generation/providers/integrated_description_provider.h>
17 #include <rosgraph_msgs/Clock.h>
18 #include <websocketpp/common/connection_hdl.hpp>
19 
22 #include <foxglove_bridge/param_utils.hpp>
27 
28 namespace {
29 
30 inline std::unordered_set<std::string> rpcValueToStringSet(const XmlRpc::XmlRpcValue& v) {
31  std::unordered_set<std::string> set;
32  for (int i = 0; i < v.size(); ++i) {
33  set.insert(v[i]);
34  }
35  return set;
36 }
37 
38 } // namespace
39 
40 namespace foxglove_bridge {
41 
42 constexpr int DEFAULT_PORT = 8765;
43 constexpr char DEFAULT_ADDRESS[] = "0.0.0.0";
44 constexpr int DEFAULT_MAX_UPDATE_MS = 5000;
45 constexpr char ROS1_CHANNEL_ENCODING[] = "ros1";
46 constexpr uint32_t SUBSCRIPTION_QUEUE_LENGTH = 10;
47 constexpr double MIN_UPDATE_PERIOD_MS = 100.0;
48 constexpr uint32_t PUBLICATION_QUEUE_LENGTH = 10;
50 
51 using ConnectionHandle = websocketpp::connection_hdl;
52 using TopicAndDatatype = std::pair<std::string, std::string>;
53 using SubscriptionsByClient = std::map<ConnectionHandle, ros::Subscriber, std::owner_less<>>;
54 using ClientPublications = std::unordered_map<foxglove::ClientChannelId, ros::Publisher>;
55 using PublicationsByClient = std::map<ConnectionHandle, ClientPublications, std::owner_less<>>;
57 
59 public:
60  FoxgloveBridge() = default;
61  virtual void onInit() {
62  auto& nhp = getPrivateNodeHandle();
63  const auto address = nhp.param<std::string>("address", DEFAULT_ADDRESS);
64  const int port = nhp.param<int>("port", DEFAULT_PORT);
65  const auto send_buffer_limit = static_cast<size_t>(
66  nhp.param<int>("send_buffer_limit", foxglove::DEFAULT_SEND_BUFFER_LIMIT_BYTES));
67  const auto useTLS = nhp.param<bool>("tls", false);
68  const auto certfile = nhp.param<std::string>("certfile", "");
69  const auto keyfile = nhp.param<std::string>("keyfile", "");
70  _maxUpdateMs = static_cast<size_t>(nhp.param<int>("max_update_ms", DEFAULT_MAX_UPDATE_MS));
71  const auto useCompression = nhp.param<bool>("use_compression", false);
72  _useSimTime = nhp.param<bool>("/use_sim_time", false);
73  const auto sessionId = nhp.param<std::string>("/run_id", std::to_string(std::time(nullptr)));
74  _capabilities = nhp.param<std::vector<std::string>>(
75  "capabilities", std::vector<std::string>(foxglove::DEFAULT_CAPABILITIES.begin(),
77 
78  const auto topicWhitelistPatterns =
79  nhp.param<std::vector<std::string>>("topic_whitelist", {".*"});
80  _topicWhitelistPatterns = parseRegexPatterns(topicWhitelistPatterns);
81  if (topicWhitelistPatterns.size() != _topicWhitelistPatterns.size()) {
82  ROS_ERROR("Failed to parse one or more topic whitelist patterns");
83  }
84  const auto paramWhitelist = nhp.param<std::vector<std::string>>("param_whitelist", {".*"});
86  if (paramWhitelist.size() != _paramWhitelistPatterns.size()) {
87  ROS_ERROR("Failed to parse one or more param whitelist patterns");
88  }
89 
90  const auto serviceWhitelist = nhp.param<std::vector<std::string>>("service_whitelist", {".*"});
92  if (serviceWhitelist.size() != _serviceWhitelistPatterns.size()) {
93  ROS_ERROR("Failed to parse one or more service whitelist patterns");
94  }
95 
96  const auto clientTopicWhitelist =
97  nhp.param<std::vector<std::string>>("client_topic_whitelist", {".*"});
98  const auto clientTopicWhitelistPatterns = parseRegexPatterns(clientTopicWhitelist);
99  if (clientTopicWhitelist.size() != clientTopicWhitelistPatterns.size()) {
100  ROS_ERROR("Failed to parse one or more service whitelist patterns");
101  }
102 
103  const char* rosDistro = std::getenv("ROS_DISTRO");
104  ROS_INFO("Starting foxglove_bridge (%s, %s@%s) with %s", rosDistro,
107 
108  try {
109  foxglove::ServerOptions serverOptions;
110  serverOptions.capabilities = _capabilities;
111  if (_useSimTime) {
112  serverOptions.capabilities.push_back(foxglove::CAPABILITY_TIME);
113  }
114  serverOptions.supportedEncodings = {ROS1_CHANNEL_ENCODING};
115  serverOptions.metadata = {{"ROS_DISTRO", rosDistro}};
116  serverOptions.sendBufferLimitBytes = send_buffer_limit;
117  serverOptions.sessionId = sessionId;
118  serverOptions.useTls = useTLS;
119  serverOptions.certfile = certfile;
120  serverOptions.keyfile = keyfile;
121  serverOptions.useCompression = useCompression;
122  serverOptions.clientTopicWhitelistPatterns = clientTopicWhitelistPatterns;
123 
124  const auto logHandler =
125  std::bind(&FoxgloveBridge::logHandler, this, std::placeholders::_1, std::placeholders::_2);
126 
127  _server = foxglove::ServerFactory::createServer<ConnectionHandle>("foxglove_bridge",
128  logHandler, serverOptions);
130  hdlrs.subscribeHandler =
131  std::bind(&FoxgloveBridge::subscribe, this, std::placeholders::_1, std::placeholders::_2);
132  hdlrs.unsubscribeHandler =
133  std::bind(&FoxgloveBridge::unsubscribe, this, std::placeholders::_1, std::placeholders::_2);
134  hdlrs.clientAdvertiseHandler = std::bind(&FoxgloveBridge::clientAdvertise, this,
135  std::placeholders::_1, std::placeholders::_2);
136  hdlrs.clientUnadvertiseHandler = std::bind(&FoxgloveBridge::clientUnadvertise, this,
137  std::placeholders::_1, std::placeholders::_2);
138  hdlrs.clientMessageHandler = std::bind(&FoxgloveBridge::clientMessage, this,
139  std::placeholders::_1, std::placeholders::_2);
140  hdlrs.parameterRequestHandler =
141  std::bind(&FoxgloveBridge::getParameters, this, std::placeholders::_1,
142  std::placeholders::_2, std::placeholders::_3);
143  hdlrs.parameterChangeHandler =
144  std::bind(&FoxgloveBridge::setParameters, this, std::placeholders::_1,
145  std::placeholders::_2, std::placeholders::_3);
146  hdlrs.parameterSubscriptionHandler =
147  std::bind(&FoxgloveBridge::subscribeParameters, this, std::placeholders::_1,
148  std::placeholders::_2, std::placeholders::_3);
149  hdlrs.serviceRequestHandler = std::bind(&FoxgloveBridge::serviceRequest, this,
150  std::placeholders::_1, std::placeholders::_2);
151  hdlrs.subscribeConnectionGraphHandler = [this](bool subscribe) {
153  };
154  _server->setHandlers(std::move(hdlrs));
155 
156  _server->start(address, static_cast<uint16_t>(port));
157 
158  xmlrpcServer.bind("paramUpdate", std::bind(&FoxgloveBridge::parameterUpdates, this,
159  std::placeholders::_1, std::placeholders::_2));
161 
163 
164  if (_useSimTime) {
165  _clockSubscription = getMTNodeHandle().subscribe<rosgraph_msgs::Clock>(
166  "/clock", 10, [&](const rosgraph_msgs::Clock::ConstPtr msg) {
167  _server->broadcastTime(msg->clock.toNSec());
168  });
169  }
170  } catch (const std::exception& err) {
171  ROS_ERROR("Failed to start websocket server: %s", err.what());
172  // Rethrow exception such that the nodelet is unloaded.
173  throw err;
174  }
175  };
176  virtual ~FoxgloveBridge() {
178  if (_server) {
179  _server->stop();
180  }
181  }
182 
183 private:
184  struct PairHash {
185  template <class T1, class T2>
186  std::size_t operator()(const std::pair<T1, T2>& pair) const {
187  return std::hash<T1>()(pair.first) ^ std::hash<T2>()(pair.second);
188  }
189  };
190 
191  void subscribe(foxglove::ChannelId channelId, ConnectionHandle clientHandle) {
192  std::lock_guard<std::mutex> lock(_subscriptionsMutex);
193 
194  auto it = _advertisedTopics.find(channelId);
195  if (it == _advertisedTopics.end()) {
196  const std::string errMsg =
197  "Received subscribe request for unknown channel " + std::to_string(channelId);
198  ROS_WARN_STREAM(errMsg);
199  throw foxglove::ChannelError(channelId, errMsg);
200  }
201 
202  const auto& channel = it->second;
203  const auto& topic = channel.topic;
204  const auto& datatype = channel.schemaName;
205 
206  // Get client subscriptions for this channel or insert an empty map.
207  auto [subscriptionsIt, firstSubscription] =
208  _subscriptions.emplace(channelId, SubscriptionsByClient());
209  auto& subscriptionsByClient = subscriptionsIt->second;
210 
211  if (!firstSubscription &&
212  subscriptionsByClient.find(clientHandle) != subscriptionsByClient.end()) {
213  const std::string errMsg =
214  "Client is already subscribed to channel " + std::to_string(channelId);
215  ROS_WARN_STREAM(errMsg);
216  throw foxglove::ChannelError(channelId, errMsg);
217  }
218 
219  try {
220  subscriptionsByClient.emplace(
221  clientHandle, getMTNodeHandle().subscribe<ros_babel_fish::BabelFishMessage>(
222  topic, SUBSCRIPTION_QUEUE_LENGTH,
223  std::bind(&FoxgloveBridge::rosMessageHandler, this, channelId, clientHandle,
224  std::placeholders::_1)));
225  if (firstSubscription) {
226  ROS_INFO("Subscribed to topic \"%s\" (%s) on channel %d", topic.c_str(), datatype.c_str(),
227  channelId);
228 
229  } else {
230  ROS_INFO("Added subscriber #%zu to topic \"%s\" (%s) on channel %d",
231  subscriptionsByClient.size(), topic.c_str(), datatype.c_str(), channelId);
232  }
233  } catch (const std::exception& ex) {
234  const std::string errMsg =
235  "Failed to subscribe to topic '" + topic + "' (" + datatype + "): " + ex.what();
236  ROS_ERROR_STREAM(errMsg);
237  throw foxglove::ChannelError(channelId, errMsg);
238  }
239  }
240 
241  void unsubscribe(foxglove::ChannelId channelId, ConnectionHandle clientHandle) {
242  std::lock_guard<std::mutex> lock(_subscriptionsMutex);
243 
244  const auto channelIt = _advertisedTopics.find(channelId);
245  if (channelIt == _advertisedTopics.end()) {
246  const std::string errMsg =
247  "Received unsubscribe request for unknown channel " + std::to_string(channelId);
248  ROS_WARN_STREAM(errMsg);
249  throw foxglove::ChannelError(channelId, errMsg);
250  }
251  const auto& channel = channelIt->second;
252 
253  auto subscriptionsIt = _subscriptions.find(channelId);
254  if (subscriptionsIt == _subscriptions.end()) {
255  throw foxglove::ChannelError(channelId, "Received unsubscribe request for channel " +
256  std::to_string(channelId) +
257  " that was not subscribed to ");
258  }
259 
260  auto& subscriptionsByClient = subscriptionsIt->second;
261  const auto clientSubscription = subscriptionsByClient.find(clientHandle);
262  if (clientSubscription == subscriptionsByClient.end()) {
264  channelId, "Received unsubscribe request for channel " + std::to_string(channelId) +
265  "from a client that was not subscribed to this channel");
266  }
267 
268  subscriptionsByClient.erase(clientSubscription);
269  if (subscriptionsByClient.empty()) {
270  ROS_INFO("Unsubscribing from topic \"%s\" (%s) on channel %d", channel.topic.c_str(),
271  channel.schemaName.c_str(), channelId);
272  _subscriptions.erase(subscriptionsIt);
273  } else {
274  ROS_INFO("Removed one subscription from channel %d (%zu subscription(s) left)", channelId,
275  subscriptionsByClient.size());
276  }
277  }
278 
280  ConnectionHandle clientHandle) {
281  if (channel.encoding != ROS1_CHANNEL_ENCODING) {
283  channel.channelId, "Unsupported encoding. Only '" + std::string(ROS1_CHANNEL_ENCODING) +
284  "' encoding is supported at the moment.");
285  }
286 
287  std::unique_lock<std::shared_mutex> lock(_publicationsMutex);
288 
289  // Get client publications or insert an empty map.
290  auto [clientPublicationsIt, isFirstPublication] =
291  _clientAdvertisedTopics.emplace(clientHandle, ClientPublications());
292 
293  auto& clientPublications = clientPublicationsIt->second;
294  if (!isFirstPublication &&
295  clientPublications.find(channel.channelId) != clientPublications.end()) {
297  channel.channelId, "Received client advertisement from " +
298  _server->remoteEndpointString(clientHandle) + " for channel " +
299  std::to_string(channel.channelId) + " it had already advertised");
300  }
301 
302  const auto msgDescription = _rosTypeInfoProvider.getMessageDescription(channel.schemaName);
303  if (!msgDescription) {
305  channel.channelId, "Failed to retrieve type information of data type '" +
306  channel.schemaName + "'. Unable to advertise topic " + channel.topic);
307  }
308 
309  ros::AdvertiseOptions advertiseOptions;
310  advertiseOptions.datatype = channel.schemaName;
311  advertiseOptions.has_header = false; // TODO
312  advertiseOptions.latch = false;
313  advertiseOptions.md5sum = msgDescription->md5;
314  advertiseOptions.message_definition = msgDescription->message_definition;
315  advertiseOptions.queue_size = PUBLICATION_QUEUE_LENGTH;
316  advertiseOptions.topic = channel.topic;
317  auto publisher = getMTNodeHandle().advertise(advertiseOptions);
318 
319  if (publisher) {
320  clientPublications.insert({channel.channelId, std::move(publisher)});
321  ROS_INFO("Client %s is advertising \"%s\" (%s) on channel %d",
322  _server->remoteEndpointString(clientHandle).c_str(), channel.topic.c_str(),
323  channel.schemaName.c_str(), channel.channelId);
324  } else {
325  const auto errMsg =
326  "Failed to create publisher for topic " + channel.topic + "(" + channel.schemaName + ")";
327  ROS_ERROR_STREAM(errMsg);
328  throw foxglove::ClientChannelError(channel.channelId, errMsg);
329  }
330  }
331 
333  std::unique_lock<std::shared_mutex> lock(_publicationsMutex);
334 
335  auto clientPublicationsIt = _clientAdvertisedTopics.find(clientHandle);
336  if (clientPublicationsIt == _clientAdvertisedTopics.end()) {
338  channelId, "Ignoring client unadvertisement from " +
339  _server->remoteEndpointString(clientHandle) + " for unknown channel " +
340  std::to_string(channelId) + ", client has no advertised topics");
341  }
342 
343  auto& clientPublications = clientPublicationsIt->second;
344 
345  auto channelPublicationIt = clientPublications.find(channelId);
346  if (channelPublicationIt == clientPublications.end()) {
348  channelId, "Ignoring client unadvertisement from " +
349  _server->remoteEndpointString(clientHandle) + " for unknown channel " +
350  std::to_string(channelId) + ", client has " +
351  std::to_string(clientPublications.size()) + " advertised topic(s)");
352  }
353 
354  const auto& publisher = channelPublicationIt->second;
355  ROS_INFO("Client %s is no longer advertising %s (%d subscribers) on channel %d",
356  _server->remoteEndpointString(clientHandle).c_str(), publisher.getTopic().c_str(),
357  publisher.getNumSubscribers(), channelId);
358  clientPublications.erase(channelPublicationIt);
359 
360  if (clientPublications.empty()) {
361  _clientAdvertisedTopics.erase(clientPublicationsIt);
362  }
363  }
364 
365  void clientMessage(const foxglove::ClientMessage& clientMsg, ConnectionHandle clientHandle) {
366  ros_babel_fish::BabelFishMessage::Ptr msg(new ros_babel_fish::BabelFishMessage);
367  msg->read(clientMsg);
368 
369  const auto channelId = clientMsg.advertisement.channelId;
370  std::shared_lock<std::shared_mutex> lock(_publicationsMutex);
371 
372  auto clientPublicationsIt = _clientAdvertisedTopics.find(clientHandle);
373  if (clientPublicationsIt == _clientAdvertisedTopics.end()) {
375  channelId, "Dropping client message from " + _server->remoteEndpointString(clientHandle) +
376  " for unknown channel " + std::to_string(channelId) +
377  ", client has no advertised topics");
378  }
379 
380  auto& clientPublications = clientPublicationsIt->second;
381 
382  auto channelPublicationIt = clientPublications.find(clientMsg.advertisement.channelId);
383  if (channelPublicationIt == clientPublications.end()) {
385  channelId, "Dropping client message from " + _server->remoteEndpointString(clientHandle) +
386  " for unknown channel " + std::to_string(channelId) + ", client has " +
387  std::to_string(clientPublications.size()) + " advertised topic(s)");
388  }
389 
390  try {
391  channelPublicationIt->second.publish(msg);
392  } catch (const std::exception& ex) {
393  throw foxglove::ClientChannelError(channelId, "Failed to publish message on topic '" +
394  channelPublicationIt->second.getTopic() +
395  "': " + ex.what());
396  }
397  }
398 
400  _updateTimer.stop();
401  if (!ros::ok()) {
402  return;
403  }
404 
405  const bool servicesEnabled = hasCapability(foxglove::CAPABILITY_SERVICES);
406  const bool querySystemState = servicesEnabled || _subscribeGraphUpdates;
407 
408  std::vector<std::string> serviceNames;
409  foxglove::MapOfSets publishers, subscribers, services;
410 
411  // Retrieve system state from ROS master.
412  if (querySystemState) {
413  XmlRpc::XmlRpcValue params, result, payload;
414  params[0] = this->getName();
415  if (ros::master::execute("getSystemState", params, result, payload, false) &&
416  static_cast<int>(result[0]) == 1) {
417  const auto& systemState = result[2];
418  const auto& publishersXmlRpc = systemState[0];
419  const auto& subscribersXmlRpc = systemState[1];
420  const auto& servicesXmlRpc = systemState[2];
421 
422  for (int i = 0; i < servicesXmlRpc.size(); ++i) {
423  const std::string& name = servicesXmlRpc[i][0];
425  serviceNames.push_back(name);
426  services.emplace(name, rpcValueToStringSet(servicesXmlRpc[i][1]));
427  }
428  }
429  for (int i = 0; i < publishersXmlRpc.size(); ++i) {
430  const std::string& name = publishersXmlRpc[i][0];
432  publishers.emplace(name, rpcValueToStringSet(publishersXmlRpc[i][1]));
433  }
434  }
435  for (int i = 0; i < subscribersXmlRpc.size(); ++i) {
436  const std::string& name = subscribersXmlRpc[i][0];
438  subscribers.emplace(name, rpcValueToStringSet(subscribersXmlRpc[i][1]));
439  }
440  }
441  } else {
442  ROS_WARN("Failed to call getSystemState: %s", result.toXml().c_str());
443  }
444  }
445 
447  if (servicesEnabled) {
448  updateAdvertisedServices(serviceNames);
449  }
451  _server->updateConnectionGraph(publishers, subscribers, services);
452  }
453 
454  // Schedule the next update using truncated exponential backoff, between `MIN_UPDATE_PERIOD_MS`
455  // and `_maxUpdateMs`
456  _updateCount++;
457  const auto nextUpdateMs = std::max(
458  MIN_UPDATE_PERIOD_MS, static_cast<double>(std::min(size_t(1) << _updateCount, _maxUpdateMs)));
461  }
462 
464  // Get the current list of visible topics and datatypes from the ROS graph
465  std::vector<ros::master::TopicInfo> topicNamesAndTypes;
466  if (!ros::master::getTopics(topicNamesAndTypes)) {
467  ROS_WARN("Failed to retrieve published topics from ROS master.");
468  return;
469  }
470 
471  std::unordered_set<TopicAndDatatype, PairHash> latestTopics;
472  latestTopics.reserve(topicNamesAndTypes.size());
473  for (const auto& topicNameAndType : topicNamesAndTypes) {
474  const auto& topicName = topicNameAndType.name;
475  const auto& datatype = topicNameAndType.datatype;
476 
477  // Ignore the topic if it is not on the topic whitelist
478  if (isWhitelisted(topicName, _topicWhitelistPatterns)) {
479  latestTopics.emplace(topicName, datatype);
480  }
481  }
482 
483  if (const auto numIgnoredTopics = topicNamesAndTypes.size() - latestTopics.size()) {
484  ROS_DEBUG(
485  "%zu topics have been ignored as they do not match any pattern on the topic whitelist",
486  numIgnoredTopics);
487  }
488 
489  std::lock_guard<std::mutex> lock(_subscriptionsMutex);
490 
491  // Remove channels for which the topic does not exist anymore
492  std::vector<foxglove::ChannelId> channelIdsToRemove;
493  for (auto channelIt = _advertisedTopics.begin(); channelIt != _advertisedTopics.end();) {
494  const TopicAndDatatype topicAndDatatype = {channelIt->second.topic,
495  channelIt->second.schemaName};
496  if (latestTopics.find(topicAndDatatype) == latestTopics.end()) {
497  const auto channelId = channelIt->first;
498  channelIdsToRemove.push_back(channelId);
499  _subscriptions.erase(channelId);
500  ROS_DEBUG("Removed channel %d for topic \"%s\" (%s)", channelId,
501  topicAndDatatype.first.c_str(), topicAndDatatype.second.c_str());
502  channelIt = _advertisedTopics.erase(channelIt);
503  } else {
504  channelIt++;
505  }
506  }
507  _server->removeChannels(channelIdsToRemove);
508 
509  // Add new channels for new topics
510  std::vector<foxglove::ChannelWithoutId> channelsToAdd;
511  for (const auto& topicAndDatatype : latestTopics) {
512  if (std::find_if(_advertisedTopics.begin(), _advertisedTopics.end(),
513  [topicAndDatatype](const auto& channelIdAndChannel) {
514  const auto& channel = channelIdAndChannel.second;
515  return channel.topic == topicAndDatatype.first &&
516  channel.schemaName == topicAndDatatype.second;
517  }) != _advertisedTopics.end()) {
518  continue; // Topic already advertised
519  }
520 
521  foxglove::ChannelWithoutId newChannel{};
522  newChannel.topic = topicAndDatatype.first;
523  newChannel.schemaName = topicAndDatatype.second;
524  newChannel.encoding = ROS1_CHANNEL_ENCODING;
525 
526  try {
527  const auto msgDescription =
528  _rosTypeInfoProvider.getMessageDescription(topicAndDatatype.second);
529  if (msgDescription) {
530  newChannel.schema = msgDescription->message_definition;
531  } else {
532  ROS_WARN("Could not find definition for type %s", topicAndDatatype.second.c_str());
533 
534  // We still advertise the channel, but with an emtpy schema
535  newChannel.schema = "";
536  }
537  } catch (const std::exception& err) {
538  ROS_WARN("Failed to add channel for topic \"%s\" (%s): %s", topicAndDatatype.first.c_str(),
539  topicAndDatatype.second.c_str(), err.what());
540  continue;
541  }
542 
543  channelsToAdd.push_back(newChannel);
544  }
545 
546  const auto channelIds = _server->addChannels(channelsToAdd);
547  for (size_t i = 0; i < channelsToAdd.size(); ++i) {
548  const auto channelId = channelIds[i];
549  const auto& channel = channelsToAdd[i];
550  _advertisedTopics.emplace(channelId, channel);
551  ROS_DEBUG("Advertising channel %d for topic \"%s\" (%s)", channelId, channel.topic.c_str(),
552  channel.schemaName.c_str());
553  }
554  }
555 
556  void updateAdvertisedServices(const std::vector<std::string>& serviceNames) {
557  std::unique_lock<std::shared_mutex> lock(_servicesMutex);
558 
559  // Remove advertisements for services that have been removed
560  std::vector<foxglove::ServiceId> servicesToRemove;
561  for (const auto& service : _advertisedServices) {
562  const auto it =
563  std::find_if(serviceNames.begin(), serviceNames.end(), [service](const auto& serviceName) {
564  return serviceName == service.second.name;
565  });
566  if (it == serviceNames.end()) {
567  servicesToRemove.push_back(service.first);
568  }
569  }
570  for (auto serviceId : servicesToRemove) {
571  _advertisedServices.erase(serviceId);
572  }
573  _server->removeServices(servicesToRemove);
574 
575  // Advertise new services
576  std::vector<foxglove::ServiceWithoutId> newServices;
577  for (const auto& serviceName : serviceNames) {
578  if (std::find_if(_advertisedServices.begin(), _advertisedServices.end(),
579  [&serviceName](const auto& idWithService) {
580  return idWithService.second.name == serviceName;
581  }) != _advertisedServices.end()) {
582  continue; // Already advertised
583  }
584 
585  auto serviceTypeFuture = retrieveServiceType(serviceName);
586  if (serviceTypeFuture.wait_for(std::chrono::milliseconds(
587  SERVICE_TYPE_RETRIEVAL_TIMEOUT_MS)) != std::future_status::ready) {
588  ROS_WARN("Failed to retrieve type of service %s", serviceName.c_str());
589  continue;
590  }
591 
592  try {
593  const auto serviceType = serviceTypeFuture.get();
594  const auto srvDescription = _rosTypeInfoProvider.getServiceDescription(serviceType);
595 
597  service.name = serviceName;
598  service.type = serviceType;
599 
600  if (srvDescription) {
601  service.requestSchema = srvDescription->request->message_definition;
602  service.responseSchema = srvDescription->response->message_definition;
603  } else {
604  ROS_ERROR("Failed to retrieve type information for service '%s' of type '%s'",
605  serviceName.c_str(), serviceType.c_str());
606 
607  // We still advertise the channel, but with empty schema.
608  service.requestSchema = "";
609  service.responseSchema = "";
610  }
611  newServices.push_back(service);
612  } catch (const std::exception& e) {
613  ROS_ERROR("Failed to retrieve service type or service description of service %s: %s",
614  serviceName.c_str(), e.what());
615  continue;
616  }
617  }
618 
619  const auto serviceIds = _server->addServices(newServices);
620  for (size_t i = 0; i < serviceIds.size(); ++i) {
621  _advertisedServices.emplace(serviceIds[i], newServices[i]);
622  }
623  }
624 
625  void getParameters(const std::vector<std::string>& parameters,
626  const std::optional<std::string>& requestId, ConnectionHandle hdl) {
627  const bool allParametersRequested = parameters.empty();
628  std::vector<std::string> parameterNames = parameters;
629  if (allParametersRequested) {
630  if (!getMTNodeHandle().getParamNames(parameterNames)) {
631  const auto errMsg = "Failed to retrieve parameter names";
632  ROS_ERROR_STREAM(errMsg);
633  throw std::runtime_error(errMsg);
634  }
635  }
636 
637  bool success = true;
638  std::vector<foxglove::Parameter> params;
639  for (const auto& paramName : parameterNames) {
640  if (!isWhitelisted(paramName, _paramWhitelistPatterns)) {
641  if (allParametersRequested) {
642  continue;
643  } else {
644  ROS_ERROR("Parameter '%s' is not on the allowlist", paramName.c_str());
645  success = false;
646  }
647  }
648 
649  try {
650  XmlRpc::XmlRpcValue value;
651  getMTNodeHandle().getParam(paramName, value);
652  params.push_back(fromRosParam(paramName, value));
653  } catch (const std::exception& ex) {
654  ROS_ERROR("Invalid parameter '%s': %s", paramName.c_str(), ex.what());
655  success = false;
656  } catch (const XmlRpc::XmlRpcException& ex) {
657  ROS_ERROR("Invalid parameter '%s': %s", paramName.c_str(), ex.getMessage().c_str());
658  success = false;
659  } catch (...) {
660  ROS_ERROR("Invalid parameter '%s'", paramName.c_str());
661  success = false;
662  }
663  }
664 
665  _server->publishParameterValues(hdl, params, requestId);
666 
667  if (!success) {
668  throw std::runtime_error("Failed to retrieve one or multiple parameters");
669  }
670  }
671 
672  void setParameters(const std::vector<foxglove::Parameter>& parameters,
673  const std::optional<std::string>& requestId, ConnectionHandle hdl) {
675  auto nh = this->getMTNodeHandle();
676 
677  bool success = true;
678  for (const auto& param : parameters) {
679  const auto paramName = param.getName();
680  if (!isWhitelisted(paramName, _paramWhitelistPatterns)) {
681  ROS_ERROR("Parameter '%s' is not on the allowlist", paramName.c_str());
682  success = false;
683  continue;
684  }
685 
686  try {
687  const auto paramType = param.getType();
688  const auto paramValue = param.getValue();
689  if (paramType == ParameterType::PARAMETER_NOT_SET) {
690  nh.deleteParam(paramName);
691  } else {
692  nh.setParam(paramName, toRosParam(paramValue));
693  }
694  } catch (const std::exception& ex) {
695  ROS_ERROR("Failed to set parameter '%s': %s", paramName.c_str(), ex.what());
696  success = false;
697  } catch (const XmlRpc::XmlRpcException& ex) {
698  ROS_ERROR("Failed to set parameter '%s': %s", paramName.c_str(), ex.getMessage().c_str());
699  success = false;
700  } catch (...) {
701  ROS_ERROR("Failed to set parameter '%s'", paramName.c_str());
702  success = false;
703  }
704  }
705 
706  // If a request Id was given, send potentially updated parameters back to client
707  if (requestId) {
708  std::vector<std::string> parameterNames(parameters.size());
709  for (size_t i = 0; i < parameters.size(); ++i) {
710  parameterNames[i] = parameters[i].getName();
711  }
712  getParameters(parameterNames, requestId, hdl);
713  }
714 
715  if (!success) {
716  throw std::runtime_error("Failed to set one or multiple parameters");
717  }
718  }
719 
720  void subscribeParameters(const std::vector<std::string>& parameters,
722  const auto opVerb =
723  (op == foxglove::ParameterSubscriptionOperation::SUBSCRIBE) ? "subscribe" : "unsubscribe";
724  bool success = true;
725  for (const auto& paramName : parameters) {
726  if (!isWhitelisted(paramName, _paramWhitelistPatterns)) {
727  ROS_ERROR("Parameter '%s' is not allowlist", paramName.c_str());
728  continue;
729  }
730 
731  XmlRpc::XmlRpcValue params, result, payload;
732  params[0] = getName() + "2";
733  params[1] = xmlrpcServer.getServerURI();
734  params[2] = ros::names::resolve(paramName);
735 
736  const std::string opName = std::string(opVerb) + "Param";
737  if (ros::master::execute(opName, params, result, payload, false)) {
738  ROS_DEBUG("%s '%s'", opName.c_str(), paramName.c_str());
739  } else {
740  ROS_WARN("Failed to %s '%s': %s", opVerb, paramName.c_str(), result.toXml().c_str());
741  success = false;
742  }
743  }
744 
745  if (!success) {
746  throw std::runtime_error("Failed to " + std::string(opVerb) + " one or multiple parameters.");
747  }
748  }
749 
751  result[0] = 1;
752  result[1] = std::string("");
753  result[2] = 0;
754 
755  if (params.size() != 3) {
756  ROS_ERROR("Parameter update called with invalid parameter size: %d", params.size());
757  return;
758  }
759 
760  try {
761  const std::string paramName = ros::names::clean(params[1]);
762  const XmlRpc::XmlRpcValue paramValue = params[2];
763  const auto param = fromRosParam(paramName, paramValue);
764  _server->updateParameterValues({param});
765  } catch (const std::exception& ex) {
766  ROS_ERROR("Failed to update parameter: %s", ex.what());
767  } catch (const XmlRpc::XmlRpcException& ex) {
768  ROS_ERROR("Failed to update parameter: %s", ex.getMessage().c_str());
769  } catch (...) {
770  ROS_ERROR("Failed to update parameter");
771  }
772  }
773 
774  void logHandler(foxglove::WebSocketLogLevel level, char const* msg) {
775  switch (level) {
777  ROS_DEBUG("[WS] %s", msg);
778  break;
780  ROS_INFO("[WS] %s", msg);
781  break;
783  ROS_WARN("[WS] %s", msg);
784  break;
786  ROS_ERROR("[WS] %s", msg);
787  break;
789  ROS_FATAL("[WS] %s", msg);
790  break;
791  }
792  }
793 
795  const foxglove::ChannelId channelId, ConnectionHandle clientHandle,
797  const auto& msg = msgEvent.getConstMessage();
798  const auto receiptTimeNs = msgEvent.getReceiptTime().toNSec();
799  _server->sendMessage(clientHandle, channelId, receiptTimeNs, msg->buffer(), msg->size());
800  }
801 
802  void serviceRequest(const foxglove::ServiceRequest& request, ConnectionHandle clientHandle) {
803  std::shared_lock<std::shared_mutex> lock(_servicesMutex);
804  const auto serviceIt = _advertisedServices.find(request.serviceId);
805  if (serviceIt == _advertisedServices.end()) {
806  const auto errMsg =
807  "Service with id " + std::to_string(request.serviceId) + " does not exist";
808  ROS_ERROR_STREAM(errMsg);
809  throw foxglove::ServiceError(request.serviceId, errMsg);
810  }
811  const auto& serviceName = serviceIt->second.name;
812  const auto& serviceType = serviceIt->second.type;
813  ROS_DEBUG("Received a service request for service %s (%s)", serviceName.c_str(),
814  serviceType.c_str());
815 
816  if (!ros::service::exists(serviceName, false)) {
817  throw foxglove::ServiceError(request.serviceId,
818  "Service '" + serviceName + "' does not exist");
819  }
820 
821  const auto srvDescription = _rosTypeInfoProvider.getServiceDescription(serviceType);
822  if (!srvDescription) {
823  const auto errMsg =
824  "Failed to retrieve type information for service " + serviceName + "(" + serviceType + ")";
825  ROS_ERROR_STREAM(errMsg);
826  throw foxglove::ServiceError(request.serviceId, errMsg);
827  }
828 
829  GenericService genReq, genRes;
830  genReq.type = genRes.type = serviceType;
831  genReq.md5sum = genRes.md5sum = srvDescription->md5;
832  genReq.data = request.data;
833 
834  if (ros::service::call(serviceName, genReq, genRes)) {
836  res.serviceId = request.serviceId;
837  res.callId = request.callId;
838  res.encoding = request.encoding;
839  res.data = genRes.data;
840  _server->sendServiceResponse(clientHandle, res);
841  } else {
843  request.serviceId, "Failed to call service " + serviceName + "(" + serviceType + ")");
844  }
845  }
846 
847  bool hasCapability(const std::string& capability) {
848  return std::find(_capabilities.begin(), _capabilities.end(), capability) != _capabilities.end();
849  }
850 
851  std::unique_ptr<foxglove::ServerInterface<ConnectionHandle>> _server;
852  ros_babel_fish::IntegratedDescriptionProvider _rosTypeInfoProvider;
853  std::vector<std::regex> _topicWhitelistPatterns;
854  std::vector<std::regex> _paramWhitelistPatterns;
855  std::vector<std::regex> _serviceWhitelistPatterns;
857  std::unordered_map<foxglove::ChannelId, foxglove::ChannelWithoutId> _advertisedTopics;
858  std::unordered_map<foxglove::ChannelId, SubscriptionsByClient> _subscriptions;
859  std::unordered_map<foxglove::ServiceId, foxglove::ServiceWithoutId> _advertisedServices;
862  std::shared_mutex _publicationsMutex;
863  std::shared_mutex _servicesMutex;
865  size_t _maxUpdateMs = size_t(DEFAULT_MAX_UPDATE_MS);
866  size_t _updateCount = 0;
868  bool _useSimTime = false;
869  std::vector<std::string> _capabilities;
870  std::atomic<bool> _subscribeGraphUpdates = false;
871 };
872 
873 } // namespace foxglove_bridge
874 
const char FOXGLOVE_BRIDGE_GIT_HASH[]
const boost::shared_ptr< ConstMessage > & getConstMessage() const
ClientAdvertisement advertisement
Definition: common.hpp:87
void updateAdvertisedServices(const std::vector< std::string > &serviceNames)
ROSCPP_DECL bool getParamNames(std::vector< std::string > &keys)
bool param(const std::string &param_name, T &param_val, const T &default_val)
#define ROS_FATAL(...)
std::string message_definition
std::unordered_map< foxglove::ServiceId, foxglove::ServiceWithoutId > _advertisedServices
constexpr size_t DEFAULT_SEND_BUFFER_LIMIT_BYTES
std::pair< std::string, std::string > TopicAndDatatype
Timer createTimer(Rate r, Handler h, Obj o, bool oneshot=false, bool autostart=true) const
Subscriber subscribe(const std::string &topic, uint32_t queue_size, void(T::*fp)(M), T *obj, const TransportHints &transport_hints=TransportHints())
std::vector< std::regex > _serviceWhitelistPatterns
uint64_t toNSec() const
XmlRpc::XmlRpcValue toRosParam(const foxglove::ParameterValue &param)
bool call(const std::string &service_name, MReq &req, MRes &res)
std::vector< std::regex > clientTopicWhitelistPatterns
void getParameters(const std::vector< std::string > &parameters, const std::optional< std::string > &requestId, ConnectionHandle hdl)
void stop()
void unsubscribe(foxglove::ChannelId channelId, ConnectionHandle clientHandle)
std::size_t operator()(const std::pair< T1, T2 > &pair) const
ROSCPP_DECL bool getTopics(V_TopicInfo &topics)
const std::string & getMessage() const
bool isWhitelisted(const std::string &name, const std::vector< std::regex > &regexPatterns)
Definition: regex_utils.hpp:10
void subscribe(foxglove::ChannelId channelId, ConnectionHandle clientHandle)
std::string responseSchema
Definition: common.hpp:116
ROSCPP_DECL std::string resolve(const std::string &name, bool remap=true)
std::vector< std::regex > parseRegexPatterns(const std::vector< std::string > &strings)
ROSCPP_DECL std::string clean(const std::string &name)
uint32_t ChannelId
Definition: common.hpp:25
bool hasCapability(const std::string &capability)
ros::NodeHandle & getPrivateNodeHandle() const
std::pair< std::string, std::string > TopicAndDatatype
void subscribeParameters(const std::vector< std::string > &parameters, foxglove::ParameterSubscriptionOperation op, ConnectionHandle)
constexpr uint32_t PUBLICATION_QUEUE_LENGTH
#define ROS_WARN(...)
const char FOXGLOVE_BRIDGE_VERSION[]
std::vector< uint8_t > data
Definition: common.hpp:132
std::future< std::string > retrieveServiceType(const std::string &serviceName)
std::unordered_map< std::string, std::string > metadata
constexpr int SERVICE_TYPE_RETRIEVAL_TIMEOUT_MS
constexpr uint32_t SUBSCRIPTION_QUEUE_LENGTH
const std::string & getName() const
std::function< void(ChannelId, ConnectionHandle)> subscribeHandler
void setParameters(const std::vector< foxglove::Parameter > &parameters, const std::optional< std::string > &requestId, ConnectionHandle hdl)
std::unordered_map< foxglove::ChannelId, SubscriptionsByClient > _subscriptions
std::unordered_map< std::string, std::unordered_set< std::string > > MapOfSets
const char * datatype()
uint32_t ClientChannelId
Definition: common.hpp:26
constexpr double MIN_UPDATE_PERIOD_MS
void logHandler(foxglove::WebSocketLogLevel level, char const *msg)
#define ROS_INFO(...)
bool getParam(const std::string &key, std::string &s) const
void updateAdvertisedTopicsAndServices(const ros::TimerEvent &)
void clientMessage(const foxglove::ClientMessage &clientMsg, ConnectionHandle clientHandle)
constexpr char CAPABILITY_TIME[]
Definition: common.hpp:14
ROSCPP_DECL bool ok()
ROSCPP_DECL bool execute(const std::string &method, const XmlRpc::XmlRpcValue &request, XmlRpc::XmlRpcValue &response, XmlRpc::XmlRpcValue &payload, bool wait_for_master)
std::map< ConnectionHandle, ClientPublications, std::owner_less<> > PublicationsByClient
std::map< ConnectionHandle, ros::Subscriber, std::owner_less<> > SubscriptionsByClient
Publisher advertise(const std::string &topic, uint32_t queue_size, bool latch=false)
void parameterUpdates(XmlRpc::XmlRpcValue &params, XmlRpc::XmlRpcValue &result)
#define ROS_WARN_STREAM(args)
ros::NodeHandle & getMTNodeHandle() const
ros::Time getReceiptTime() const
const char * WebSocketUserAgent()
constexpr char CAPABILITY_SERVICES[]
Definition: common.hpp:17
std::string toXml() const
bool bind(const std::string &function_name, const XMLRPCFunc &cb)
std::vector< std::string > capabilities
ParameterSubscriptionOperation
Definition: parameter.hpp:11
void serviceRequest(const foxglove::ServiceRequest &request, ConnectionHandle clientHandle)
websocketpp::connection_hdl ConnectionHandle
ROSCPP_DECL bool exists(const std::string &service_name, bool print_failure_reason)
constexpr char ROS1_CHANNEL_ENCODING[]
std::string requestSchema
Definition: common.hpp:115
#define ROS_ERROR_STREAM(args)
std::unordered_map< foxglove::ChannelId, foxglove::ChannelWithoutId > _advertisedTopics
std::unordered_map< foxglove::ClientChannelId, ros::Publisher > ClientPublications
foxglove::Parameter fromRosParam(const std::string &name, const XmlRpc::XmlRpcValue &value)
ClientChannelId channelId
Definition: common.hpp:76
std::unique_ptr< foxglove::ServerInterface< ConnectionHandle > > _server
#define PLUGINLIB_EXPORT_CLASS(class_type, base_class_type)
void rosMessageHandler(const foxglove::ChannelId channelId, ConnectionHandle clientHandle, const ros::MessageEvent< ros_babel_fish::BabelFishMessage const > &msgEvent)
void clientUnadvertise(foxglove::ClientChannelId channelId, ConnectionHandle clientHandle)
#define ROS_ERROR(...)
void clientAdvertise(const foxglove::ClientAdvertisement &channel, ConnectionHandle clientHandle)
WebSocketLogLevel
Definition: common.hpp:41
const std::string & getServerURI() const
std::vector< std::string > supportedEncodings
constexpr std::array< const char *, 5 > DEFAULT_CAPABILITIES
Definition: common.hpp:20
ros_babel_fish::IntegratedDescriptionProvider _rosTypeInfoProvider
#define ROS_DEBUG(...)


foxglove_bridge
Author(s): Foxglove
autogenerated on Mon Jul 3 2023 02:12:22