MqttClient.ros2.cpp
Go to the documentation of this file.
1 /*
2 ==============================================================================
3 MIT License
4 
5 Copyright 2022 Institute for Automotive Engineering of RWTH Aachen University.
6 
7 Permission is hereby granted, free of charge, to any person obtaining a copy
8 of this software and associated documentation files (the "Software"), to deal
9 in the Software without restriction, including without limitation the rights
10 to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
11 copies of the Software, and to permit persons to whom the Software is
12 furnished to do so, subject to the following conditions:
13 
14 The above copyright notice and this permission notice shall be included in all
15 copies or substantial portions of the Software.
16 
17 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19 FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
20 AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21 LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
22 OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
23 SOFTWARE.
24 ==============================================================================
25 */
26 
27 
28 #include <algorithm>
29 #include <cstdint>
30 #include <cstring>
31 #include <vector>
32 
34 #include <mqtt_client_interfaces/msg/ros_msg_type.hpp>
35 #include <rcpputils/env.hpp>
36 #include <std_msgs/msg/bool.hpp>
37 #include <std_msgs/msg/char.hpp>
38 #include <std_msgs/msg/float32.hpp>
39 #include <std_msgs/msg/float64.hpp>
40 #include <std_msgs/msg/int16.hpp>
41 #include <std_msgs/msg/int32.hpp>
42 #include <std_msgs/msg/int64.hpp>
43 #include <std_msgs/msg/int8.hpp>
44 #include <std_msgs/msg/string.hpp>
45 #include <std_msgs/msg/u_int16.hpp>
46 #include <std_msgs/msg/u_int32.hpp>
47 #include <std_msgs/msg/u_int64.hpp>
48 #include <std_msgs/msg/u_int8.hpp>
49 
50 #include <rclcpp_components/register_node_macro.hpp>
51 RCLCPP_COMPONENTS_REGISTER_NODE(mqtt_client::MqttClient)
52 
53 
54 namespace mqtt_client {
55 
56 
58  "mqtt_client/ros_msg_type/";
59 
60 const std::string MqttClient::kLatencyRosTopicPrefix = "~/latencies/";
61 
62 
79  const std::shared_ptr<rclcpp::SerializedMessage>& serialized_msg,
80  const std::string& msg_type, std::string& primitive) {
81 
82  bool found_primitive = true;
83 
84  if (msg_type == "std_msgs/msg/String") {
85  std_msgs::msg::String msg;
86  deserializeRosMessage(*serialized_msg, msg);
87  primitive = msg.data;
88  } else if (msg_type == "std_msgs/msg/Bool") {
89  std_msgs::msg::Bool msg;
90  deserializeRosMessage(*serialized_msg, msg);
91  primitive = msg.data ? "true" : "false";
92  } else if (msg_type == "std_msgs/msg/Char") {
93  std_msgs::msg::Char msg;
94  deserializeRosMessage(*serialized_msg, msg);
95  primitive = std::to_string(msg.data);
96  } else if (msg_type == "std_msgs/msg/UInt8") {
97  std_msgs::msg::UInt8 msg;
98  deserializeRosMessage(*serialized_msg, msg);
99  primitive = std::to_string(msg.data);
100  } else if (msg_type == "std_msgs/msg/UInt16") {
101  std_msgs::msg::UInt16 msg;
102  deserializeRosMessage(*serialized_msg, msg);
103  primitive = std::to_string(msg.data);
104  } else if (msg_type == "std_msgs/msg/UInt32") {
105  std_msgs::msg::UInt32 msg;
106  deserializeRosMessage(*serialized_msg, msg);
107  primitive = std::to_string(msg.data);
108  } else if (msg_type == "std_msgs/msg/UInt64") {
109  std_msgs::msg::UInt64 msg;
110  deserializeRosMessage(*serialized_msg, msg);
111  primitive = std::to_string(msg.data);
112  } else if (msg_type == "std_msgs/msg/Int8") {
113  std_msgs::msg::Int8 msg;
114  deserializeRosMessage(*serialized_msg, msg);
115  primitive = std::to_string(msg.data);
116  } else if (msg_type == "std_msgs/msg/Int16") {
117  std_msgs::msg::Int16 msg;
118  deserializeRosMessage(*serialized_msg, msg);
119  primitive = std::to_string(msg.data);
120  } else if (msg_type == "std_msgs/msg/Int32") {
121  std_msgs::msg::Int32 msg;
122  deserializeRosMessage(*serialized_msg, msg);
123  primitive = std::to_string(msg.data);
124  } else if (msg_type == "std_msgs/msg/Int64") {
125  std_msgs::msg::Int64 msg;
126  deserializeRosMessage(*serialized_msg, msg);
127  primitive = std::to_string(msg.data);
128  } else if (msg_type == "std_msgs/msg/Float32") {
129  std_msgs::msg::Float32 msg;
130  deserializeRosMessage(*serialized_msg, msg);
131  primitive = std::to_string(msg.data);
132  } else if (msg_type == "std_msgs/msg/Float64") {
133  std_msgs::msg::Float64 msg;
134  deserializeRosMessage(*serialized_msg, msg);
135  primitive = std::to_string(msg.data);
136  } else {
137  found_primitive = false;
138  }
139 
140  return found_primitive;
141 }
142 
143 
144 MqttClient::MqttClient(const rclcpp::NodeOptions& options) : Node("mqtt_client", options) {
145 
146  loadParameters();
147  setup();
148 }
149 
150 
152 
153  rcl_interfaces::msg::ParameterDescriptor param_desc;
154 
155  param_desc.description = "IP address or hostname of the machine running the MQTT broker";
156  declare_parameter("broker.host", rclcpp::ParameterType::PARAMETER_STRING, param_desc);
157  param_desc.description = "port the MQTT broker is listening on";
158  declare_parameter("broker.port", rclcpp::ParameterType::PARAMETER_INTEGER, param_desc);
159  param_desc.description = "username used for authenticating to the broker (if empty, will try to connect anonymously)";
160  declare_parameter("broker.user", rclcpp::ParameterType::PARAMETER_STRING, param_desc);
161  param_desc.description = "password used for authenticating to the broker";
162  declare_parameter("broker.pass", rclcpp::ParameterType::PARAMETER_STRING, param_desc);
163  param_desc.description = "whether to connect via SSL/TLS";
164  declare_parameter("broker.tls.enabled", rclcpp::ParameterType::PARAMETER_BOOL, param_desc);
165  param_desc.description = "CA certificate file trusted by client (relative to ROS_HOME)";
166  declare_parameter("broker.tls.ca_certificate", rclcpp::ParameterType::PARAMETER_STRING, param_desc);
167 
168  param_desc.description = "unique ID used to identify the client (broker may allow empty ID and automatically generate one)";
169  declare_parameter("client.id", rclcpp::ParameterType::PARAMETER_STRING, param_desc);
170  param_desc.description = "maximum number of messages buffered by the bridge when not connected to broker (only available if client ID is not empty)";
171  declare_parameter("client.buffer.size", rclcpp::ParameterType::PARAMETER_INTEGER, param_desc);
172  param_desc.description = "directory used to buffer messages when not connected to broker (relative to ROS_HOME)";
173  declare_parameter("client.buffer.directory", rclcpp::ParameterType::PARAMETER_STRING, param_desc);
174  param_desc.description = "topic used for this client's last-will message (no last will, if not specified)";
175  declare_parameter("client.last_will.topic", rclcpp::ParameterType::PARAMETER_STRING, param_desc);
176  param_desc.description = "last-will message";
177  declare_parameter("client.last_will.message", rclcpp::ParameterType::PARAMETER_STRING, param_desc);
178  param_desc.description = "QoS value for last-will message";
179  declare_parameter("client.last_will.qos", rclcpp::ParameterType::PARAMETER_INTEGER, param_desc);
180  param_desc.description = "whether to retain last-will message";
181  declare_parameter("client.last_will.retained", rclcpp::ParameterType::PARAMETER_BOOL, param_desc);
182  param_desc.description = "whether to use a clean session for this client";
183  declare_parameter("client.clean_session", rclcpp::ParameterType::PARAMETER_BOOL, param_desc);
184  param_desc.description = "keep-alive interval in seconds";
185  declare_parameter("client.keep_alive_interval", rclcpp::ParameterType::PARAMETER_DOUBLE, param_desc);
186  param_desc.description = "maximum number of inflight messages";
187  declare_parameter("client.max_inflight", rclcpp::ParameterType::PARAMETER_INTEGER, param_desc);
188  param_desc.description = "client certificate file (only needed if broker requires client certificates; relative to ROS_HOME)";
189  declare_parameter("client.tls.certificate", rclcpp::ParameterType::PARAMETER_STRING, param_desc);
190  param_desc.description = "client private key file (relative to ROS_HOME)";
191  declare_parameter("client.tls.key", rclcpp::ParameterType::PARAMETER_STRING, param_desc);
192  param_desc.description = "client private key password";
193  declare_parameter("client.tls.password", rclcpp::ParameterType::PARAMETER_STRING, param_desc);
194 
195  param_desc.description = "The list of topics to bridge from ROS to MQTT";
196  const auto ros2mqtt_ros_topics = declare_parameter<std::vector<std::string>>("bridge.ros2mqtt.ros_topics", std::vector<std::string>(), param_desc);
197  for (const auto& ros_topic : ros2mqtt_ros_topics)
198  {
199  param_desc.description = "MQTT topic on which the corresponding ROS messages are sent to the broker";
200  declare_parameter(fmt::format("bridge.ros2mqtt.{}.mqtt_topic", ros_topic), rclcpp::ParameterType::PARAMETER_STRING, param_desc);
201  param_desc.description = "whether to publish as primitive message";
202  declare_parameter(fmt::format("bridge.ros2mqtt.{}.primitive", ros_topic), rclcpp::ParameterType::PARAMETER_BOOL, param_desc);
203  param_desc.description = "whether to attach a timestamp to a ROS2MQTT payload (for latency computation on receiver side)";
204  declare_parameter(fmt::format("bridge.ros2mqtt.{}.inject_timestamp", ros_topic), rclcpp::ParameterType::PARAMETER_BOOL, param_desc);
205  param_desc.description = "ROS subscriber queue size";
206  declare_parameter(fmt::format("bridge.ros2mqtt.{}.advanced.ros.queue_size", ros_topic), rclcpp::ParameterType::PARAMETER_INTEGER, param_desc);
207  param_desc.description = "MQTT QoS value";
208  declare_parameter(fmt::format("bridge.ros2mqtt.{}.advanced.mqtt.qos", ros_topic), rclcpp::ParameterType::PARAMETER_INTEGER, param_desc);
209  param_desc.description = "whether to retain MQTT message";
210  declare_parameter(fmt::format("bridge.ros2mqtt.{}.advanced.mqtt.retained", ros_topic), rclcpp::ParameterType::PARAMETER_BOOL, param_desc);
211  }
212 
213  param_desc.description = "The list of topics to bridge from MQTT to ROS";
214  const auto mqtt2ros_mqtt_topics = declare_parameter<std::vector<std::string>>("bridge.mqtt2ros.mqtt_topics", std::vector<std::string>(), param_desc);
215  for (const auto& mqtt_topic : mqtt2ros_mqtt_topics)
216  {
217  param_desc.description = "ROS topic on which corresponding MQTT messages are published";
218  declare_parameter(fmt::format("bridge.mqtt2ros.{}.ros_topic", mqtt_topic), rclcpp::ParameterType::PARAMETER_STRING, param_desc);
219  param_desc.description = "whether to publish as primitive message (if coming from non-ROS MQTT client)";
220  declare_parameter(fmt::format("bridge.mqtt2ros.{}.primitive", mqtt_topic), rclcpp::ParameterType::PARAMETER_BOOL, param_desc);
221  param_desc.description = "MQTT QoS value";
222  declare_parameter(fmt::format("bridge.mqtt2ros.{}.advanced.mqtt.qos", mqtt_topic), rclcpp::ParameterType::PARAMETER_INTEGER, param_desc);
223  param_desc.description = "ROS publisher queue size";
224  declare_parameter(fmt::format("bridge.mqtt2ros.{}.advanced.ros.queue_size", mqtt_topic), rclcpp::ParameterType::PARAMETER_INTEGER, param_desc);
225  param_desc.description = "whether to latch ROS message";
226  declare_parameter(fmt::format("bridge.mqtt2ros.{}.advanced.ros.latched", mqtt_topic), rclcpp::ParameterType::PARAMETER_BOOL, param_desc);
227  }
228 
229  // load broker parameters from parameter server
230  std::string broker_tls_ca_certificate;
231  loadParameter("broker.host", broker_config_.host, "localhost");
232  loadParameter("broker.port", broker_config_.port, 1883);
233  if (loadParameter("broker.user", broker_config_.user)) {
234  loadParameter("broker.pass", broker_config_.pass, "");
235  }
236  if (loadParameter("broker.tls.enabled", broker_config_.tls.enabled, false)) {
237  loadParameter("broker.tls.ca_certificate", broker_tls_ca_certificate,
238  "/etc/ssl/certs/ca-certificates.crt");
239  }
240 
241  // load client parameters from parameter server
242  std::string client_buffer_directory, client_tls_certificate, client_tls_key;
243  loadParameter("client.id", client_config_.id, "");
246  loadParameter("client.buffer.size", client_config_.buffer.size, 0);
247  loadParameter("client.buffer.directory", client_buffer_directory, "buffer");
248  } else {
249  RCLCPP_WARN(get_logger(),
250  "Client buffer can not be enabled when client ID is empty");
251  }
252  if (loadParameter("client.last_will.topic", client_config_.last_will.topic)) {
253  loadParameter("client.last_will.message", client_config_.last_will.message,
254  "offline");
255  loadParameter("client.last_will.qos", client_config_.last_will.qos, 0);
256  loadParameter("client.last_will.retained",
258  }
259  loadParameter("client.clean_session", client_config_.clean_session, true);
260  loadParameter("client.keep_alive_interval",
262  loadParameter("client.max_inflight", client_config_.max_inflight, 65535);
263  if (broker_config_.tls.enabled) {
264  if (loadParameter("client.tls.certificate", client_tls_certificate)) {
265  loadParameter("client.tls.key", client_tls_key);
266  loadParameter("client.tls.password", client_config_.tls.password);
267  loadParameter("client.tls.version", client_config_.tls.version);
268  loadParameter("client.tls.verify", client_config_.tls.verify);
269  loadParameter("client.tls.alpn_protos", client_config_.tls.alpn_protos);
270  }
271  }
272 
273  // resolve filepaths
274  broker_config_.tls.ca_certificate = resolvePath(broker_tls_ca_certificate);
275  client_config_.buffer.directory = resolvePath(client_buffer_directory);
276  client_config_.tls.certificate = resolvePath(client_tls_certificate);
277  client_config_.tls.key = resolvePath(client_tls_key);
278 
279  // parse bridge parameters
280 
281  // ros2mqtt
282  for (const auto& ros_topic : ros2mqtt_ros_topics) {
283 
284  rclcpp::Parameter mqtt_topic_param;
285  if (get_parameter(fmt::format("bridge.ros2mqtt.{}.mqtt_topic", ros_topic), mqtt_topic_param)) {
286 
287  // ros2mqtt[k]/ros_topic and ros2mqtt[k]/mqtt_topic
288  const std::string mqtt_topic = mqtt_topic_param.as_string();
289  Ros2MqttInterface& ros2mqtt = ros2mqtt_[ros_topic];
290  ros2mqtt.mqtt.topic = mqtt_topic;
291 
292  // ros2mqtt[k]/primitive
293  rclcpp::Parameter primitive_param;
294  if (get_parameter(fmt::format("bridge.ros2mqtt.{}.primitive", ros_topic), primitive_param))
295  ros2mqtt.primitive = primitive_param.as_bool();
296 
297  // ros2mqtt[k]/inject_timestamp
298  rclcpp::Parameter stamped_param;
299  if (get_parameter(fmt::format("bridge.ros2mqtt.{}.inject_timestamp", ros_topic), stamped_param))
300  ros2mqtt.stamped = stamped_param.as_bool();
301  if (ros2mqtt.stamped && ros2mqtt.primitive) {
302  RCLCPP_WARN(
303  get_logger(),
304  "Timestamp will not be injected into primitive messages on ROS "
305  "topic '%s'",
306  ros_topic.c_str());
307  ros2mqtt.stamped = false;
308  }
309 
310  // ros2mqtt[k]/advanced/ros/queue_size
311  rclcpp::Parameter queue_size_param;
312  if (get_parameter(fmt::format("bridge.ros2mqtt.{}.advanced.ros.queue_size", ros_topic),
313  queue_size_param))
314  ros2mqtt.ros.queue_size = queue_size_param.as_int();
315 
316  // ros2mqtt[k]/advanced/mqtt/qos
317  rclcpp::Parameter qos_param;
318  if (get_parameter(fmt::format("bridge.ros2mqtt.{}.advanced.mqtt.qos", ros_topic), qos_param))
319  ros2mqtt.mqtt.qos = qos_param.as_int();
320 
321  // ros2mqtt[k]/advanced/mqtt/retained
322  rclcpp::Parameter retained_param;
323  if (get_parameter(fmt::format("bridge.ros2mqtt.{}.advanced.mqtt.retained", ros_topic), retained_param))
324  ros2mqtt.mqtt.retained = retained_param.as_bool();
325 
326  RCLCPP_INFO(get_logger(), "Bridging %sROS topic '%s' to MQTT topic '%s' %s",
327  ros2mqtt.primitive ? "primitive " : "", ros_topic.c_str(),
328  ros2mqtt.mqtt.topic.c_str(),
329  ros2mqtt.stamped ? "and measuring latency" : "");
330  } else {
331  RCLCPP_WARN(get_logger(),
332  fmt::format("Parameter 'bridge.ros2mqtt.{}' is missing subparameter "
333  "'mqtt_topic', will be ignored", ros_topic).c_str());
334  }
335  }
336 
337  // mqtt2ros
338  for (const auto& mqtt_topic : mqtt2ros_mqtt_topics) {
339 
340  rclcpp::Parameter ros_topic_param;
341  if (get_parameter(fmt::format("bridge.mqtt2ros.{}.ros_topic", mqtt_topic), ros_topic_param)) {
342 
343  // mqtt2ros[k]/mqtt_topic and mqtt2ros[k]/ros_topic
344  const std::string ros_topic = ros_topic_param.as_string();
345  Mqtt2RosInterface& mqtt2ros = mqtt2ros_[mqtt_topic];
346  mqtt2ros.ros.topic = ros_topic;
347 
348  // mqtt2ros[k]/primitive
349  rclcpp::Parameter primitive_param;
350  if (get_parameter(fmt::format("bridge.mqtt2ros.{}.primitive", mqtt_topic), primitive_param))
351  mqtt2ros.primitive = primitive_param.as_bool();
352 
353  // mqtt2ros[k]/advanced/mqtt/qos
354  rclcpp::Parameter qos_param;
355  if (get_parameter(fmt::format("bridge.mqtt2ros.{}.advanced.mqtt.qos", mqtt_topic), qos_param))
356  mqtt2ros.mqtt.qos = qos_param.as_int();
357 
358  // mqtt2ros[k]/advanced/ros/queue_size
359  rclcpp::Parameter queue_size_param;
360  if (get_parameter(fmt::format("bridge.mqtt2ros.{}.advanced.ros.queue_size", mqtt_topic),
361  queue_size_param))
362  mqtt2ros.ros.queue_size = queue_size_param.as_int();
363 
364  // mqtt2ros[k]/advanced/ros/latched
365  rclcpp::Parameter latched_param;
366  if (get_parameter(fmt::format("bridge.mqtt2ros.{}.advanced.ros.latched", mqtt_topic), latched_param)) {
367  mqtt2ros.ros.latched = latched_param.as_bool();
368  RCLCPP_WARN(get_logger(),
369  fmt::format("Parameter 'bridge.mqtt2ros.{}.advanced.ros.latched' is ignored "
370  "since ROS 2 does not easily support latched topics.", mqtt_topic).c_str());
371  }
372 
373  RCLCPP_INFO(get_logger(), "Bridging MQTT topic '%s' to %sROS topic '%s'",
374  mqtt_topic.c_str(), mqtt2ros.primitive ? "primitive " : "",
375  mqtt2ros.ros.topic.c_str());
376  }
377  else {
378  RCLCPP_WARN(get_logger(),
379  fmt::format("Parameter 'bridge.ros2mqtt.{}' is missing subparameter "
380  "'ros_topic', will be ignored", mqtt_topic).c_str());
381  }
382  }
383 
384  if (ros2mqtt_.empty() && mqtt2ros_.empty()) {
385  RCLCPP_ERROR(get_logger(),
386  "No valid ROS-MQTT bridge found in parameter 'bridge'");
387  exit(EXIT_FAILURE);
388  }
389 }
390 
391 
392 bool MqttClient::loadParameter(const std::string& key, std::string& value) {
393  bool found = get_parameter(key, value);
394  if (found)
395  RCLCPP_DEBUG(get_logger(), "Retrieved parameter '%s' = '%s'", key.c_str(),
396  value.c_str());
397  return found;
398 }
399 
400 
401 bool MqttClient::loadParameter(const std::string& key, std::string& value,
402  const std::string& default_value) {
403  bool found = get_parameter_or(key, value, default_value);
404  if (!found)
405  RCLCPP_WARN(get_logger(), "Parameter '%s' not set, defaulting to '%s'",
406  key.c_str(), default_value.c_str());
407  if (found)
408  RCLCPP_DEBUG(get_logger(), "Retrieved parameter '%s' = '%s'", key.c_str(),
409  value.c_str());
410  return found;
411 }
412 
413 
414 std::filesystem::path MqttClient::resolvePath(const std::string& path_string) {
415 
416  std::filesystem::path path(path_string);
417  if (path_string.empty()) return path;
418  if (!path.has_root_path()) {
419  std::string ros_home = rcpputils::get_env_var("ROS_HOME");
420  if (ros_home.empty())
421  ros_home = std::string(std::filesystem::current_path());
422  path = std::filesystem::path(ros_home);
423  path.append(path_string);
424  }
425  if (!std::filesystem::exists(path))
426  RCLCPP_WARN(get_logger(), "Requested path '%s' does not exist",
427  std::string(path).c_str());
428  return path;
429 }
430 
431 
432 void MqttClient::setup() {
433 
434  // pre-compute timestamp length
435  builtin_interfaces::msg::Time tmp_stamp;
436  rclcpp::SerializedMessage tmp_serialized_stamp;
437  serializeRosMessage(tmp_stamp, tmp_serialized_stamp);
438  stamp_length_ = tmp_serialized_stamp.size();
439 
440  // initialize MQTT client
441  setupClient();
442 
443  // connect to MQTT broker
444  connect();
445 
446  // create ROS service server
448  create_service<mqtt_client_interfaces::srv::IsConnected>(
449  "is_connected", std::bind(&MqttClient::isConnectedService, this,
450  std::placeholders::_1, std::placeholders::_2));
451 
452 
453  // setup subscribers; check for new types every second
455  create_wall_timer(std::chrono::duration<double>(1.0),
456  std::bind(&MqttClient::setupSubscriptions, this));
457 }
458 
459 
461 
462  // get info of all topics
463  const auto all_topics_and_types = get_topic_names_and_types();
464 
465  // check for ros2mqtt topics
466  for (auto& [ros_topic, ros2mqtt] : ros2mqtt_) {
467  if (all_topics_and_types.count(ros_topic)) {
468 
469  // check if message type has changed
470  const std::string& msg_type = all_topics_and_types.at(ros_topic)[0];
471  if (msg_type == ros2mqtt.ros.msg_type) continue;
472  ros2mqtt.ros.msg_type = msg_type;
473 
474  // create new generic subscription, if message type has changed
475  std::function<void(const std::shared_ptr<rclcpp::SerializedMessage> msg)>
476  bound_callback_func = std::bind(&MqttClient::ros2mqtt, this,
477  std::placeholders::_1, ros_topic);
478  try {
479  ros2mqtt.ros.subscriber = create_generic_subscription(
480  ros_topic, msg_type, ros2mqtt.ros.queue_size, bound_callback_func);
481  } catch (rclcpp::exceptions::RCLError& e) {
482  RCLCPP_ERROR(get_logger(), "Failed to create generic subscriber: %s",
483  e.what());
484  return;
485  }
486  RCLCPP_INFO(get_logger(), "Subscribed ROS topic '%s' of type '%s'",
487  ros_topic.c_str(), msg_type.c_str());
488  }
489  }
490 }
491 
492 
494 
495  // basic client connection options
496  connect_options_.set_automatic_reconnect(true);
497  connect_options_.set_clean_session(client_config_.clean_session);
498  connect_options_.set_keep_alive_interval(client_config_.keep_alive_interval);
499  connect_options_.set_max_inflight(client_config_.max_inflight);
500 
501  // user authentication
502  if (!broker_config_.user.empty()) {
503  connect_options_.set_user_name(broker_config_.user);
504  connect_options_.set_password(broker_config_.pass);
505  }
506 
507  // last will
508  if (!client_config_.last_will.topic.empty()) {
509  mqtt::will_options will(
512  connect_options_.set_will(will);
513  }
514 
515  // SSL/TLS
516  if (broker_config_.tls.enabled) {
517  mqtt::ssl_options ssl;
518  ssl.set_trust_store(broker_config_.tls.ca_certificate);
519  if (!client_config_.tls.certificate.empty() &&
520  !client_config_.tls.key.empty()) {
521  ssl.set_key_store(client_config_.tls.certificate);
522  ssl.set_private_key(client_config_.tls.key);
523  if (!client_config_.tls.password.empty())
524  ssl.set_private_key_password(client_config_.tls.password);
525  }
526  ssl.set_ssl_version(client_config_.tls.version);
527  ssl.set_verify(client_config_.tls.verify);
528  ssl.set_alpn_protos(client_config_.tls.alpn_protos);
529  connect_options_.set_ssl(ssl);
530  }
531 
532  // create MQTT client
533  const std::string protocol = broker_config_.tls.enabled ? "ssl" : "tcp";
534  const std::string uri = fmt::format("{}://{}:{}", protocol, broker_config_.host,
536  try {
538  client_ = std::shared_ptr<mqtt::async_client>(new mqtt::async_client(
541  } else {
542  client_ = std::shared_ptr<mqtt::async_client>(
543  new mqtt::async_client(uri, client_config_.id));
544  }
545  } catch (const mqtt::exception& e) {
546  RCLCPP_ERROR(get_logger(), "Client could not be initialized: %s", e.what());
547  exit(EXIT_FAILURE);
548  }
549 
550  // setup MQTT callbacks
551  client_->set_callback(*this);
552 }
553 
554 
555 void MqttClient::connect() {
556 
557  std::string as_client =
558  client_config_.id.empty()
559  ? ""
560  : std::string(" as '") + client_config_.id + std::string("'");
561  RCLCPP_INFO(get_logger(), "Connecting to broker at '%s'%s ...",
562  client_->get_server_uri().c_str(), as_client.c_str());
563 
564  try {
565  client_->connect(connect_options_, nullptr, *this);
566  } catch (const mqtt::exception& e) {
567  RCLCPP_ERROR(get_logger(), "Connection to broker failed: %s", e.what());
568  exit(EXIT_FAILURE);
569  }
570 }
571 
572 
574  const std::shared_ptr<rclcpp::SerializedMessage>& serialized_msg,
575  const std::string& ros_topic) {
576 
577  Ros2MqttInterface& ros2mqtt = ros2mqtt_[ros_topic];
578  std::string mqtt_topic = ros2mqtt.mqtt.topic;
579  std::vector<uint8_t> payload_buffer;
580 
581  // gather information on ROS message type in special ROS message
582  mqtt_client_interfaces::msg::RosMsgType ros_msg_type;
583  ros_msg_type.name = ros2mqtt.ros.msg_type;
584  ros_msg_type.stamped = ros2mqtt.stamped;
585 
586  RCLCPP_DEBUG(get_logger(), "Received ROS message of type '%s' on topic '%s'",
587  ros_msg_type.name.c_str(), ros_topic.c_str());
588 
589  if (ros2mqtt.primitive) { // publish as primitive (string) message
590 
591  // resolve ROS messages to primitive strings if possible
592  std::string payload;
593  bool found_primitive =
594  primitiveRosMessageToString(serialized_msg, ros_msg_type.name, payload);
595  if (found_primitive) {
596  payload_buffer = std::vector<uint8_t>(payload.begin(), payload.end());
597  } else {
598  RCLCPP_WARN(get_logger(),
599  "Cannot send ROS message of type '%s' as primitive message, "
600  "check supported primitive types",
601  ros_msg_type.name.c_str());
602  return;
603  }
604 
605  } else { // publish as complete ROS message incl. ROS message type
606 
607  // serialize ROS message type
608  rclcpp::SerializedMessage serialized_ros_msg_type;
609  serializeRosMessage(ros_msg_type, serialized_ros_msg_type);
610  uint32_t msg_type_length = serialized_ros_msg_type.size();
611  std::vector<uint8_t> msg_type_buffer = std::vector<uint8_t>(
612  serialized_ros_msg_type.get_rcl_serialized_message().buffer,
613  serialized_ros_msg_type.get_rcl_serialized_message().buffer + msg_type_length);
614 
615  // send ROS message type information to MQTT broker
616  mqtt_topic = kRosMsgTypeMqttTopicPrefix + ros2mqtt.mqtt.topic;
617  try {
618  RCLCPP_DEBUG(get_logger(),
619  "Sending ROS message type to MQTT broker on topic '%s' ...",
620  mqtt_topic.c_str());
621  mqtt::message_ptr mqtt_msg =
622  mqtt::make_message(mqtt_topic, msg_type_buffer.data(),
623  msg_type_buffer.size(), ros2mqtt.mqtt.qos, true);
624  client_->publish(mqtt_msg);
625  } catch (const mqtt::exception& e) {
626  RCLCPP_WARN(
627  get_logger(),
628  "Publishing ROS message type information to MQTT topic '%s' failed: %s",
629  mqtt_topic.c_str(), e.what());
630  }
631 
632  // build MQTT payload for ROS message (R) as [R]
633  // or [S, R] if timestamp (S) is included
634  uint32_t msg_length = serialized_msg->size();
635  uint32_t payload_length = msg_length;
636  uint32_t msg_offset = 0;
637  if (ros2mqtt.stamped) {
638 
639  // allocate buffer with appropriate size to hold [S, R]
640  msg_offset += stamp_length_;
641  payload_length += stamp_length_;
642  payload_buffer.resize(payload_length);
643 
644  // copy serialized ROS message to payload [-, R]
645  std::copy(serialized_msg->get_rcl_serialized_message().buffer,
646  serialized_msg->get_rcl_serialized_message().buffer + msg_length,
647  payload_buffer.begin() + msg_offset);
648  } else {
649 
650  // directly build payload buffer on top of serialized message
651  payload_buffer = std::vector<uint8_t>(
652  serialized_msg->get_rcl_serialized_message().buffer,
653  serialized_msg->get_rcl_serialized_message().buffer + msg_length);
654  }
655 
656  // inject timestamp as final step
657  if (ros2mqtt.stamped) {
658 
659  // take current timestamp
660  builtin_interfaces::msg::Time stamp(rclcpp::Clock(RCL_SYSTEM_TIME).now());
661 
662  // copy serialized timestamp to payload [S, R]
663  rclcpp::SerializedMessage serialized_stamp;
664  serializeRosMessage(stamp, serialized_stamp);
665  std::copy(
666  serialized_stamp.get_rcl_serialized_message().buffer,
667  serialized_stamp.get_rcl_serialized_message().buffer + stamp_length_,
668  payload_buffer.begin());
669  }
670  }
671 
672  // send ROS message to MQTT broker
673  mqtt_topic = ros2mqtt.mqtt.topic;
674  try {
675  RCLCPP_DEBUG(
676  get_logger(),
677  "Sending ROS message of type '%s' to MQTT broker on topic '%s' ...",
678  ros_msg_type.name.c_str(), mqtt_topic.c_str());
679  mqtt::message_ptr mqtt_msg = mqtt::make_message(
680  mqtt_topic, payload_buffer.data(), payload_buffer.size(),
681  ros2mqtt.mqtt.qos, ros2mqtt.mqtt.retained);
682  client_->publish(mqtt_msg);
683  } catch (const mqtt::exception& e) {
684  RCLCPP_WARN(
685  get_logger(),
686  "Publishing ROS message type information to MQTT topic '%s' failed: %s",
687  mqtt_topic.c_str(), e.what());
688  }
689 }
690 
691 
692 void MqttClient::mqtt2ros(mqtt::const_message_ptr mqtt_msg,
693  const rclcpp::Time& arrival_stamp) {
694 
695  std::string mqtt_topic = mqtt_msg->get_topic();
696  Mqtt2RosInterface& mqtt2ros = mqtt2ros_[mqtt_topic];
697  auto& payload = mqtt_msg->get_payload_ref();
698  uint32_t payload_length = static_cast<uint32_t>(payload.size());
699 
700  // read MQTT payload for ROS message (R) as [R]
701  // or [S, R] if timestamp (S) is included
702  uint32_t msg_length = payload_length;
703  uint32_t msg_offset = 0;
704 
705  // if stamped, compute latency
706  if (mqtt2ros.stamped) {
707 
708  // copy stamp to generic message buffer
709  rclcpp::SerializedMessage serialized_stamp(stamp_length_);
710  std::memcpy(serialized_stamp.get_rcl_serialized_message().buffer,
711  &(payload[msg_offset]), stamp_length_);
712  serialized_stamp.get_rcl_serialized_message().buffer_length = stamp_length_;
713 
714  // deserialize stamp
715  builtin_interfaces::msg::Time stamp;
716  deserializeRosMessage(serialized_stamp, stamp);
717 
718  // compute ROS2MQTT latency
719  rclcpp::Duration latency = arrival_stamp - stamp;
720  std_msgs::msg::Float64 latency_msg;
721  latency_msg.data = latency.seconds();
722 
723  // publish latency
724  if (!mqtt2ros.ros.latency_publisher) {
725  std::string latency_topic = kLatencyRosTopicPrefix + mqtt2ros.ros.topic;
726  latency_topic.replace(latency_topic.find("//"), 2, "/");
727  mqtt2ros.ros.latency_publisher =
728  create_publisher<std_msgs::msg::Float64>(latency_topic, 1);
729  }
730  mqtt2ros.ros.latency_publisher->publish(latency_msg);
731 
732  msg_length -= stamp_length_;
733  msg_offset += stamp_length_;
734  }
735 
736  // copy ROS message from MQTT message to generic message buffer
737  rclcpp::SerializedMessage serialized_msg(msg_length);
738  std::memcpy(serialized_msg.get_rcl_serialized_message().buffer,
739  &(payload[msg_offset]), msg_length);
740  serialized_msg.get_rcl_serialized_message().buffer_length = msg_length;
741 
742  // publish generic ROS message
743  RCLCPP_DEBUG(
744  get_logger(),
745  "Sending ROS message of type '%s' from MQTT broker to ROS topic '%s' ...",
746  mqtt2ros.ros.msg_type.c_str(), mqtt2ros.ros.topic.c_str());
747  mqtt2ros.ros.publisher->publish(serialized_msg);
748 }
749 
750 
751 void MqttClient::mqtt2primitive(mqtt::const_message_ptr mqtt_msg) {
752 
753  std::string mqtt_topic = mqtt_msg->get_topic();
754  Mqtt2RosInterface& mqtt2ros = mqtt2ros_[mqtt_topic];
755  const std::string str_msg = mqtt_msg->to_string();
756 
757  bool found_primitive = false;
758  std::string ros_msg_type = "std_msgs/msg/String";
759  rclcpp::SerializedMessage serialized_msg;
760 
761  // check for bool
762  if (!found_primitive) {
763  std::string bool_str = str_msg;
764  std::transform(str_msg.cbegin(), str_msg.cend(), bool_str.begin(),
765  ::tolower);
766  if (bool_str == "true" || bool_str == "false") {
767 
768  bool bool_msg = (bool_str == "true");
769 
770  // construct and serialize ROS message
771  std_msgs::msg::Bool msg;
772  msg.data = bool_msg;
773  serializeRosMessage(msg, serialized_msg);
774 
775  ros_msg_type = "std_msgs/msg/Bool";
776  found_primitive = true;
777  }
778  }
779 
780  // check for int
781  if (!found_primitive) {
782  std::size_t pos;
783  try {
784  const int int_msg = std::stoi(str_msg, &pos);
785  if (pos == str_msg.size()) {
786 
787  // construct and serialize ROS message
788  std_msgs::msg::Int32 msg;
789  msg.data = int_msg;
790  serializeRosMessage(msg, serialized_msg);
791 
792  ros_msg_type = "std_msgs/msg/Int32";
793  found_primitive = true;
794  }
795  } catch (const std::invalid_argument& ex) {
796  } catch (const std::out_of_range& ex) {
797  }
798  }
799 
800  // check for float
801  if (!found_primitive) {
802  std::size_t pos;
803  try {
804  const float float_msg = std::stof(str_msg, &pos);
805  if (pos == str_msg.size()) {
806 
807  // construct and serialize ROS message
808  std_msgs::msg::Float32 msg;
809  msg.data = float_msg;
810  serializeRosMessage(msg, serialized_msg);
811 
812  ros_msg_type = "std_msgs/msg/Float32";
813  found_primitive = true;
814  }
815  } catch (const std::invalid_argument& ex) {
816  } catch (const std::out_of_range& ex) {
817  }
818  }
819 
820  // fall back to string
821  if (!found_primitive) {
822 
823  // construct and serialize ROS message
824  std_msgs::msg::String msg;
825  msg.data = str_msg;
826  serializeRosMessage(msg, serialized_msg);
827  }
828 
829  // if ROS message type has changed
830  if (ros_msg_type != mqtt2ros.ros.msg_type) {
831 
832  mqtt2ros.ros.msg_type = ros_msg_type;
833  RCLCPP_INFO(get_logger(),
834  "ROS publisher message type on topic '%s' set to '%s'",
835  mqtt2ros.ros.topic.c_str(), ros_msg_type.c_str());
836 
837  // recreate generic publisher
838  try {
839  mqtt2ros.ros.publisher = create_generic_publisher(
840  mqtt2ros.ros.topic, ros_msg_type, mqtt2ros.ros.queue_size);
841  } catch (rclcpp::exceptions::RCLError& e) {
842  RCLCPP_ERROR(get_logger(), "Failed to create generic publisher: %s",
843  e.what());
844  return;
845  }
846  }
847 
848  // publish
849  RCLCPP_DEBUG(
850  get_logger(),
851  "Sending ROS message of type '%s' from MQTT broker to ROS topic '%s' ...",
852  mqtt2ros.ros.msg_type.c_str(), mqtt2ros.ros.topic.c_str());
853  mqtt2ros.ros.publisher->publish(serialized_msg);
854 }
855 
856 
857 void MqttClient::connected(const std::string& cause) {
858 
859  (void) cause; // Avoid compiler warning for unused parameter.
860 
861  is_connected_ = true;
862  std::string as_client =
863  client_config_.id.empty()
864  ? ""
865  : std::string(" as '") + client_config_.id + std::string("'");
866  RCLCPP_INFO(get_logger(), "Connected to broker at '%s'%s",
867  client_->get_server_uri().c_str(), as_client.c_str());
868 
869  // subscribe MQTT topics
870  for (const auto& [mqtt_topic, mqtt2ros] : mqtt2ros_) {
871  std::string mqtt_topic_to_subscribe = mqtt_topic;
872  if (!mqtt2ros.primitive) // subscribe topics for ROS message types first
873  mqtt_topic_to_subscribe = kRosMsgTypeMqttTopicPrefix + mqtt_topic;
874  client_->subscribe(mqtt_topic_to_subscribe, mqtt2ros.mqtt.qos);
875  RCLCPP_INFO(get_logger(), "Subscribed MQTT topic '%s'", mqtt_topic_to_subscribe.c_str());
876  }
877 }
878 
879 
880 void MqttClient::connection_lost(const std::string& cause) {
881 
882  (void) cause; // Avoid compiler warning for unused parameter.
883 
884  RCLCPP_ERROR(get_logger(),
885  "Connection to broker lost, will try to reconnect...");
886  is_connected_ = false;
887  connect();
888 }
889 
890 
892 
893  return is_connected_;
894 }
895 
896 
898  mqtt_client_interfaces::srv::IsConnected::Request::SharedPtr request,
899  mqtt_client_interfaces::srv::IsConnected::Response::SharedPtr response) {
900 
901  (void) request; // Avoid compiler warning for unused parameter.
902  response->connected = isConnected();
903 }
904 
905 
906 void MqttClient::message_arrived(mqtt::const_message_ptr mqtt_msg) {
907 
908  // instantly take arrival timestamp
909  rclcpp::Time arrival_stamp(
910  builtin_interfaces::msg::Time(rclcpp::Clock(RCL_SYSTEM_TIME).now()));
911 
912  std::string mqtt_topic = mqtt_msg->get_topic();
913  RCLCPP_DEBUG(get_logger(), "Received MQTT message on topic '%s'",
914  mqtt_topic.c_str());
915 
916  // publish directly if primitive
917  if (mqtt2ros_.count(mqtt_topic) > 0) {
918  Mqtt2RosInterface& mqtt2ros = mqtt2ros_[mqtt_topic];
919  if (mqtt2ros.primitive) {
920  mqtt2primitive(mqtt_msg);
921  return;
922  }
923  }
924 
925  // else first check for ROS message type
926  bool msg_contains_ros_msg_type =
927  mqtt_topic.find(kRosMsgTypeMqttTopicPrefix) != std::string::npos;
928  if (msg_contains_ros_msg_type) {
929 
930  // copy message type to generic message buffer
931  auto& payload = mqtt_msg->get_payload_ref();
932  uint32_t payload_length = static_cast<uint32_t>(payload.size());
933  rclcpp::SerializedMessage serialized_ros_msg_type(payload_length);
934  std::memcpy(serialized_ros_msg_type.get_rcl_serialized_message().buffer,
935  &(payload[0]), payload_length);
936  serialized_ros_msg_type.get_rcl_serialized_message().buffer_length = payload_length;
937 
938  // deserialize ROS message type
939  mqtt_client_interfaces::msg::RosMsgType ros_msg_type;
940  deserializeRosMessage(serialized_ros_msg_type, ros_msg_type);
941 
942  // reconstruct corresponding MQTT data topic
943  std::string mqtt_data_topic = mqtt_topic;
944  mqtt_data_topic.erase(mqtt_data_topic.find(kRosMsgTypeMqttTopicPrefix),
945  kRosMsgTypeMqttTopicPrefix.length());
946  Mqtt2RosInterface& mqtt2ros = mqtt2ros_[mqtt_data_topic];
947 
948  // if ROS message type has changed
949  if (ros_msg_type.name != mqtt2ros.ros.msg_type) {
950 
951  mqtt2ros.ros.msg_type = ros_msg_type.name;
952  mqtt2ros.stamped = ros_msg_type.stamped;
953  RCLCPP_INFO(get_logger(),
954  "ROS publisher message type on topic '%s' set to '%s'",
955  mqtt2ros.ros.topic.c_str(), ros_msg_type.name.c_str());
956 
957  // recreate generic publisher
958  try {
959  mqtt2ros.ros.publisher = create_generic_publisher(
960  mqtt2ros.ros.topic, ros_msg_type.name, mqtt2ros.ros.queue_size);
961  } catch (rclcpp::exceptions::RCLError& e) {
962  RCLCPP_ERROR(get_logger(), "Failed to create generic publisher: %s",
963  e.what());
964  return;
965  }
966 
967  // subscribe to MQTT topic with actual ROS messages
968  client_->subscribe(mqtt_data_topic, mqtt2ros.mqtt.qos);
969  RCLCPP_DEBUG(get_logger(), "Subscribed MQTT topic '%s'",
970  mqtt_data_topic.c_str());
971  }
972 
973  } else {
974 
975  // publish ROS message, if publisher initialized
976  if (!mqtt2ros_[mqtt_topic].ros.msg_type.empty()) {
977  mqtt2ros(mqtt_msg, arrival_stamp);
978  } else {
979  RCLCPP_WARN(
980  get_logger(),
981  "ROS publisher for data from MQTT topic '%s' is not yet initialized: "
982  "ROS message type not yet known",
983  mqtt_topic.c_str());
984  }
985  }
986 }
987 
988 
989 void MqttClient::delivery_complete(mqtt::delivery_token_ptr token) {
990 
991  (void) token; // Avoid compiler warning for unused parameter.
992 }
993 
994 
995 void MqttClient::on_success(const mqtt::token& token) {
996 
997  (void) token; // Avoid compiler warning for unused parameter.
998  is_connected_ = true;
999 }
1000 
1001 
1002 void MqttClient::on_failure(const mqtt::token& token) {
1003 
1004  RCLCPP_ERROR(
1005  get_logger(),
1006  "Connection to broker failed (return code %d), will automatically "
1007  "retry...",
1008  token.get_return_code());
1009  is_connected_ = false;
1010 }
1011 
1012 } // namespace mqtt_client
mqtt_client::MqttClient::ClientConfig::qos
int qos
last-will QoS value
Definition: MqttClient.h:357
mqtt_client::MqttClient::ClientConfig::size
int size
client buffer size
Definition: MqttClient.h:351
response
const std::string response
mqtt_client::MqttClient::mqtt2ros
void mqtt2ros(mqtt::const_message_ptr mqtt_msg, const ros::WallTime &arrival_stamp=ros::WallTime::now())
Publishes a ROS message received via MQTT to ROS.
Definition: MqttClient.cpp:607
mqtt_client::MqttClient::ClientConfig::password
std::string password
decryption password for private key
Definition: MqttClient.h:366
mqtt_client::MqttClient::setupClient
void setupClient()
Sets up the client connection options and initializes the client object.
Definition: MqttClient.cpp:410
mqtt_client::MqttClient::on_success
void on_success(const mqtt::token &token) override
Callback for when a MQTT action succeeds.
Definition: MqttClient.cpp:934
mqtt_client::MqttClient::resolvePath
std::filesystem::path resolvePath(const std::string &path_string)
Converts a string to a path object resolving paths relative to ROS_HOME.
Definition: MqttClient.cpp:365
mqtt_client::MqttClient::mqtt2primitive
void mqtt2primitive(mqtt::const_message_ptr mqtt_msg)
Publishes a primitive message received via MQTT to ROS.
Definition: MqttClient.cpp:671
mqtt_client::MqttClient::on_failure
void on_failure(const mqtt::token &token) override
Callback for when a MQTT action fails.
Definition: MqttClient.cpp:940
mqtt_client::MqttClient::stamp_length_
uint32_t stamp_length_
Definition: MqttClient.ros2.hpp:473
mqtt_client::MqttClient::ClientConfig::verify
bool verify
Definition: MqttClient.h:368
ros
mqtt_client::MqttClient::delivery_complete
void delivery_complete(mqtt::delivery_token_ptr token) override
Callback for when delivery for a MQTT message has been completed.
Definition: MqttClient.cpp:931
mqtt_client::MqttClient::is_connected_
bool is_connected_
Status variable keeping track of connection status to broker.
Definition: MqttClient.h:445
mqtt_client::MqttClient::check_subscriptions_timer_
rclcpp::TimerBase::SharedPtr check_subscriptions_timer_
Timer to repeatedly check active ROS topics for topics to subscribe.
Definition: MqttClient.ros2.hpp:427
mqtt_client::MqttClient::ClientConfig::key
std::filesystem::path key
client private keyfile
Definition: MqttClient.h:365
mqtt_client::MqttClient::ClientConfig::max_inflight
int max_inflight
maximum number of inflight messages
Definition: MqttClient.h:362
mqtt_client::MqttClient::ClientConfig::buffer
struct mqtt_client::MqttClient::ClientConfig::@1 buffer
client buffer-related variables
mqtt_client::MqttClient::loadParameter
bool loadParameter(const std::string &key, std::string &value)
Loads requested ROS parameter from parameter server.
Definition: MqttClient.cpp:339
mqtt_client::MqttClient::client_config_
ClientConfig client_config_
Client parameters.
Definition: MqttClient.h:455
mqtt_client::MqttClient::BrokerConfig::pass
std::string pass
password
Definition: MqttClient.h:336
mqtt_client::MqttClient::ClientConfig::tls
struct mqtt_client::MqttClient::ClientConfig::@3 tls
SSL/TLS-related variables.
mqtt_client::MqttClient::client_
std::shared_ptr< mqtt::async_client > client_
MQTT client variable.
Definition: MqttClient.h:460
mqtt_client::MqttClient::ClientConfig::message
std::string message
last-will message
Definition: MqttClient.h:356
mqtt_client::MqttClient::ClientConfig::version
int version
TLS version (https://github.com/eclipse/paho.mqtt.cpp/blob/master/src/mqtt/ssl_options....
Definition: MqttClient.h:367
mqtt_client::MqttClient::Mqtt2RosInterface
Struct containing variables related to a MQTT2ROS connection.
Definition: MqttClient.h:394
mqtt_client::MqttClient::ClientConfig::topic
std::string topic
last-will topic
Definition: MqttClient.h:355
mqtt_client::MqttClient::kRosMsgTypeMqttTopicPrefix
static const std::string kRosMsgTypeMqttTopicPrefix
MQTT topic prefix under which ROS message type information is published.
Definition: MqttClient.h:418
mqtt_client::MqttClient::BrokerConfig::user
std::string user
username
Definition: MqttClient.h:335
mqtt_client::MqttClient::ClientConfig::clean_session
bool clean_session
whether client requests clean session
Definition: MqttClient.h:360
mqtt_client::MqttClient::BrokerConfig::port
int port
broker port
Definition: MqttClient.h:334
MqttClient.ros2.hpp
mqtt_client::MqttClient::ros2mqtt
void ros2mqtt(const topic_tools::ShapeShifter::ConstPtr &ros_msg, const std::string &ros_topic)
Serializes and publishes a generic ROS message to the MQTT broker.
Definition: MqttClient.cpp:490
mqtt_client::MqttClient::ClientConfig::last_will
struct mqtt_client::MqttClient::ClientConfig::@2 last_will
last-will-related variables
mqtt_client::MqttClient::ClientConfig::directory
std::filesystem::path directory
client buffer directory
Definition: MqttClient.h:352
mqtt_client::MqttClient::connect_options_
mqtt::connect_options connect_options_
MQTT client connection options.
Definition: MqttClient.h:465
mqtt_client::MqttClient::Ros2MqttInterface
Struct containing variables related to a ROS2MQTT connection.
Definition: MqttClient.h:377
mqtt_client::MqttClient
ROS Nodelet for sending and receiving ROS messages via MQTT.
Definition: MqttClient.h:58
mqtt_client::MqttClient::setupSubscriptions
void setupSubscriptions()
Checks all active ROS topics in order to set up generic subscribers.
Definition: MqttClient.ros2.cpp:460
mqtt_client::MqttClient::ClientConfig::alpn_protos
std::vector< std::string > alpn_protos
list of ALPN protocols
Definition: MqttClient.h:370
mqtt_client::MqttClient::message_arrived
void message_arrived(mqtt::const_message_ptr mqtt_msg) override
Callback for when the client receives a MQTT message from the broker.
Definition: MqttClient.cpp:846
mqtt_client::MqttClient::connected
void connected(const std::string &cause) override
Callback for when the client has successfully connected to the broker.
Definition: MqttClient.cpp:801
mqtt_client::MqttClient::isConnectedService
bool isConnectedService(mqtt_client_interfaces::IsConnected::Request &request, mqtt_client_interfaces::IsConnected::Response &response)
ROS service returning whether the client is connected to the broker.
Definition: MqttClient.cpp:837
mqtt_client::MqttClient::broker_config_
BrokerConfig broker_config_
Broker parameters.
Definition: MqttClient.h:450
mqtt_client::MqttClient::kLatencyRosTopicPrefix
static const std::string kLatencyRosTopicPrefix
ROS topic prefix under which ROS2MQTT2ROS latencies are published.
Definition: MqttClient.h:425
mqtt_client::MqttClient::mqtt2ros_
std::map< std::string, Mqtt2RosInterface > mqtt2ros_
MQTT2ROS connection variables sorted by MQTT topic.
Definition: MqttClient.h:475
mqtt_client::MqttClient::ClientConfig::certificate
std::filesystem::path certificate
client certificate
Definition: MqttClient.h:364
mqtt_client::MqttClient::ClientConfig::enabled
bool enabled
whether client buffer is enabled
Definition: MqttClient.h:350
mqtt_client::MqttClient::MqttClient
MqttClient(const rclcpp::NodeOptions &options)
Initializes node.
Definition: MqttClient.ros2.cpp:144
mqtt_client::MqttClient::ros2mqtt_
std::map< std::string, Ros2MqttInterface > ros2mqtt_
ROS2MQTT connection variables sorted by ROS topic.
Definition: MqttClient.h:470
mqtt_client::MqttClient::setup
void setup()
Initializes broker connection and subscriptions.
Definition: MqttClient.cpp:384
mqtt_client::MqttClient::connection_lost
void connection_lost(const std::string &cause) override
Callback for when the client has lost connection to the broker.
Definition: MqttClient.cpp:823
mqtt_client::MqttClient::is_connected_service_
ros::ServiceServer is_connected_service_
ROS Service server for providing connection status.
Definition: MqttClient.h:440
mqtt_client::deserializeRosMessage
void deserializeRosMessage(const rclcpp::SerializedMessage &serialized_msg, T &msg)
Definition: MqttClient.ros2.hpp:553
mqtt_client::MqttClient::BrokerConfig::ca_certificate
std::filesystem::path ca_certificate
public CA certificate trusted by client
Definition: MqttClient.h:340
mqtt_client::MqttClient::ClientConfig::keep_alive_interval
double keep_alive_interval
keep-alive interval
Definition: MqttClient.h:361
mqtt_client::MqttClient::BrokerConfig::host
std::string host
broker host
Definition: MqttClient.h:333
mqtt_client::MqttClient::connect
void connect()
Connects to the broker using the member client and options.
Definition: MqttClient.cpp:472
default_value
def default_value(type_)
mqtt_client::serializeRosMessage
void serializeRosMessage(const T &msg, std::vector< uint8_t > &buffer)
Definition: MqttClient.h:541
mqtt_client::MqttClient::ClientConfig::id
std::string id
client unique ID
Definition: MqttClient.h:348
msg_type
def msg_type(f)
mqtt_client::primitiveRosMessageToString
bool primitiveRosMessageToString(const topic_tools::ShapeShifter::ConstPtr &msg, std::string &primitive)
Extracts string of primitive data types from ROS message.
Definition: MqttClient.cpp:95
mqtt_client::MqttClient::BrokerConfig::enabled
bool enabled
whether to connect via SSL/TLS
Definition: MqttClient.h:338
mqtt_client::MqttClient::BrokerConfig::tls
struct mqtt_client::MqttClient::BrokerConfig::@0 tls
SSL/TLS-related variables.
mqtt_client::MqttClient::isConnected
bool isConnected()
Returns whether the client is connected to the broker.
Definition: MqttClient.cpp:831
mqtt_client
Namespace for the mqtt_client package.
Definition: MqttClient.h:46
mqtt_client::MqttClient::ClientConfig::retained
bool retained
whether last-will is retained
Definition: MqttClient.h:358
mqtt_client::MqttClient::loadParameters
void loadParameters()
Loads ROS parameters from parameter server.
Definition: MqttClient.cpp:158


mqtt_client
Author(s): Lennart Reiher , Bastian Lampe , Christian Wende
autogenerated on Thu Oct 5 2023 02:09:10