MqttClient.ros2.hpp
Go to the documentation of this file.
1 /*
2 ==============================================================================
3 MIT License
4 
5 Copyright 2022 Institute for Automotive Engineering of RWTH Aachen University.
6 
7 Permission is hereby granted, free of charge, to any person obtaining a copy
8 of this software and associated documentation files (the "Software"), to deal
9 in the Software without restriction, including without limitation the rights
10 to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
11 copies of the Software, and to permit persons to whom the Software is
12 furnished to do so, subject to the following conditions:
13 
14 The above copyright notice and this permission notice shall be included in all
15 copies or substantial portions of the Software.
16 
17 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19 FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
20 AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21 LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
22 OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
23 SOFTWARE.
24 ==============================================================================
25 */
26 
27 
28 #pragma once
29 
30 #include <filesystem>
31 #include <map>
32 #include <memory>
33 #include <string>
34 
35 #define FMT_HEADER_ONLY
36 #include <fmt/format.h>
37 #include <mqtt_client_interfaces/srv/is_connected.hpp>
38 #include <mqtt/async_client.h>
39 #include <rclcpp/rclcpp.hpp>
40 #include <rclcpp/serialization.hpp>
41 #include <std_msgs/msg/float64.hpp>
42 
43 
47 namespace mqtt_client {
48 
49 
59 class MqttClient : public rclcpp::Node,
60  public virtual mqtt::callback,
61  public virtual mqtt::iaction_listener {
62 
63  public:
69  explicit MqttClient(const rclcpp::NodeOptions& options);
70 
71  protected:
75  void loadParameters();
76 
86  bool loadParameter(const std::string& key, std::string& value);
87 
99  bool loadParameter(const std::string& key, std::string& value,
100  const std::string& default_value);
101 
113  template <typename T>
114  bool loadParameter(const std::string& key, T& value);
115 
129  template <typename T>
130  bool loadParameter(const std::string& key, T& value, const T& default_value);
131 
143  template <typename T>
144  bool loadParameter(const std::string& key, std::vector<T>& value);
145 
159  template <typename T>
160  bool loadParameter(const std::string& key, std::vector<T>& value, const std::vector<T>& default_value);
161 
173  std::filesystem::path resolvePath(const std::string& path_string);
174 
178  void setup();
179 
183  void setupSubscriptions();
184 
189  void setupClient();
190 
194  void connect();
195 
211  void ros2mqtt(
212  const std::shared_ptr<rclcpp::SerializedMessage>& serialized_msg,
213  const std::string& ros_topic);
214 
231  void mqtt2ros(mqtt::const_message_ptr mqtt_msg,
232  const rclcpp::Time& arrival_stamp);
233 
241  void mqtt2primitive(mqtt::const_message_ptr mqtt_msg);
242 
251  void connected(const std::string& cause) override;
252 
260  void connection_lost(const std::string& cause) override;
261 
268  bool isConnected();
269 
276  void isConnectedService(
277  mqtt_client_interfaces::srv::IsConnected::Request::SharedPtr request,
278  mqtt_client_interfaces::srv::IsConnected::Response::SharedPtr response);
279 
291  void message_arrived(mqtt::const_message_ptr mqtt_msg) override;
292 
300  void delivery_complete(mqtt::delivery_token_ptr token) override;
301 
310  void on_success(const mqtt::token& token) override;
311 
320  void on_failure(const mqtt::token& token) override;
321 
322  protected:
326  struct BrokerConfig {
327  std::string host;
328  int port;
329  std::string user;
330  std::string pass;
331  struct {
332  bool enabled;
333  std::filesystem::path
335  } tls;
336  };
337 
341  struct ClientConfig {
342  std::string id;
343  struct {
344  bool enabled;
345  int size;
346  std::filesystem::path directory;
347  } buffer;
348  struct {
349  std::string topic;
350  std::string message;
351  int qos;
352  bool retained;
353  } last_will;
354  bool clean_session;
355  double keep_alive_interval;
356  int max_inflight;
357  struct {
358  std::filesystem::path certificate;
359  std::filesystem::path key;
360  std::string password;
361  int version;
362  bool verify;
363  std::vector<std::string> alpn_protos;
365  } tls;
366  };
367 
371  struct Ros2MqttInterface {
372  struct {
373  rclcpp::GenericSubscription::SharedPtr
375  std::string msg_type;
376  int queue_size = 1;
377  } ros;
378  struct {
379  std::string topic;
380  int qos = 0;
381  bool retained = false;
382  } mqtt;
383  bool primitive = false;
384  bool stamped = false;
385  };
386 
390  struct Mqtt2RosInterface {
391  struct {
392  int qos = 0;
393  } mqtt;
394  struct {
395  std::string topic;
396  std::string msg_type;
397  rclcpp::GenericPublisher::SharedPtr publisher;
398  rclcpp::Publisher<std_msgs::msg::Float64>::SharedPtr
400  int queue_size = 1;
401  bool latched = false;
402  } ros;
403  bool primitive = false;
404  bool stamped = false;
406  };
407 
408  protected:
415  static const std::string kRosMsgTypeMqttTopicPrefix;
416 
422  static const std::string kLatencyRosTopicPrefix;
423 
427  rclcpp::TimerBase::SharedPtr check_subscriptions_timer_;
428 
432  rclcpp::Service<mqtt_client_interfaces::srv::IsConnected>::SharedPtr
434 
438  bool is_connected_ = false;
439 
444 
449 
453  std::shared_ptr<mqtt::async_client> client_;
454 
458  mqtt::connect_options connect_options_;
459 
463  std::map<std::string, Ros2MqttInterface> ros2mqtt_;
464 
468  std::map<std::string, Mqtt2RosInterface> mqtt2ros_;
469 
473  uint32_t stamp_length_;
474 };
475 
476 
477 template <typename T>
478 bool MqttClient::loadParameter(const std::string& key, T& value) {
479  bool found = get_parameter(key, value);
480  if (found)
481  RCLCPP_DEBUG(get_logger(), "Retrieved parameter '%s' = '%s'", key.c_str(),
482  std::to_string(value).c_str());
483  return found;
484 }
485 
486 
487 template <typename T>
488 bool MqttClient::loadParameter(const std::string& key, T& value,
489  const T& default_value) {
490  bool found = get_parameter_or(key, value, default_value);
491  if (!found)
492  RCLCPP_WARN(get_logger(), "Parameter '%s' not set, defaulting to '%s'",
493  key.c_str(), std::to_string(default_value).c_str());
494  if (found)
495  RCLCPP_DEBUG(get_logger(), "Retrieved parameter '%s' = '%s'", key.c_str(),
496  std::to_string(value).c_str());
497  return found;
498 }
499 
500 
501 template <typename T>
502 bool MqttClient::loadParameter(const std::string& key, std::vector<T>& value)
503 {
504  const bool found = get_parameter(key, value);
505  if (found)
506  RCLCPP_WARN(get_logger(), "Retrieved parameter '%s' = '[%s]'", key.c_str(),
507  fmt::format("{}", fmt::join(value, ", ")).c_str());
508  return found;
509 }
510 
511 
512 template <typename T>
513 bool MqttClient::loadParameter(const std::string& key, std::vector<T>& value,
514  const std::vector<T>& default_value)
515 {
516  const bool found = get_parameter_or(key, value, default_value);
517  if (!found)
518  RCLCPP_WARN(get_logger(), "Parameter '%s' not set, defaulting to '%s'",
519  key.c_str(), fmt::format("{}", fmt::join(value, ", ")).c_str());
520  if (found)
521  RCLCPP_DEBUG(get_logger(), "Retrieved parameter '%s' = '%s'", key.c_str(),
522  fmt::format("{}", fmt::join(value, ", ")).c_str());
523  return found;
524 }
525 
526 
535 template <typename T>
536 void serializeRosMessage(const T& msg,
537  rclcpp::SerializedMessage& serialized_msg) {
538 
539  rclcpp::Serialization<T> serializer;
540  serializer.serialize_message(&msg, &serialized_msg);
541 }
542 
543 
552 template <typename T>
553 void deserializeRosMessage(const rclcpp::SerializedMessage& serialized_msg,
554  T& msg) {
555 
556  rclcpp::Serialization<T> serializer;
557  serializer.deserialize_message(&serialized_msg, &msg);
558 }
559 
560 } // namespace mqtt_client
mqtt_client::MqttClient::ClientConfig::qos
int qos
last-will QoS value
Definition: MqttClient.h:357
mqtt_client::MqttClient::ClientConfig::size
int size
client buffer size
Definition: MqttClient.h:351
response
const std::string response
mqtt_client::MqttClient::mqtt2ros
void mqtt2ros(mqtt::const_message_ptr mqtt_msg, const ros::WallTime &arrival_stamp=ros::WallTime::now())
Publishes a ROS message received via MQTT to ROS.
Definition: MqttClient.cpp:607
mqtt_client::MqttClient::Ros2MqttInterface::ros
struct mqtt_client::MqttClient::Ros2MqttInterface::@4 ros
ROS-related variables.
mqtt_client::MqttClient::ClientConfig::password
std::string password
decryption password for private key
Definition: MqttClient.h:366
mqtt_client::MqttClient::Ros2MqttInterface::subscriber
rclcpp::GenericSubscription::SharedPtr subscriber
generic ROS subscriber
Definition: MqttClient.ros2.hpp:374
mqtt_client::MqttClient::setupClient
void setupClient()
Sets up the client connection options and initializes the client object.
Definition: MqttClient.cpp:410
mqtt_client::MqttClient::on_success
void on_success(const mqtt::token &token) override
Callback for when a MQTT action succeeds.
Definition: MqttClient.cpp:934
mqtt_client::MqttClient::resolvePath
std::filesystem::path resolvePath(const std::string &path_string)
Converts a string to a path object resolving paths relative to ROS_HOME.
Definition: MqttClient.cpp:365
mqtt_client::MqttClient::Ros2MqttInterface::qos
int qos
MQTT QoS value.
Definition: MqttClient.h:384
mqtt_client::MqttClient::ClientConfig
Struct containing client parameters.
Definition: MqttClient.h:347
mqtt_client::MqttClient::is_connected_service_
rclcpp::Service< mqtt_client_interfaces::srv::IsConnected >::SharedPtr is_connected_service_
ROS Service server for providing connection status.
Definition: MqttClient.ros2.hpp:433
mqtt_client::MqttClient::mqtt2primitive
void mqtt2primitive(mqtt::const_message_ptr mqtt_msg)
Publishes a primitive message received via MQTT to ROS.
Definition: MqttClient.cpp:671
mqtt_client::MqttClient::Ros2MqttInterface::msg_type
std::string msg_type
message type of subscriber
Definition: MqttClient.ros2.hpp:375
mqtt_client::MqttClient::on_failure
void on_failure(const mqtt::token &token) override
Callback for when a MQTT action fails.
Definition: MqttClient.cpp:940
mqtt_client::MqttClient::stamp_length_
uint32_t stamp_length_
Definition: MqttClient.ros2.hpp:473
mqtt_client::MqttClient::Mqtt2RosInterface::mqtt
struct mqtt_client::MqttClient::Mqtt2RosInterface::@6 mqtt
MQTT-related variables.
mqtt_client::MqttClient::ClientConfig::verify
bool verify
Definition: MqttClient.h:368
mqtt_client::MqttClient::delivery_complete
void delivery_complete(mqtt::delivery_token_ptr token) override
Callback for when delivery for a MQTT message has been completed.
Definition: MqttClient.cpp:931
mqtt_client::MqttClient::is_connected_
bool is_connected_
Status variable keeping track of connection status to broker.
Definition: MqttClient.h:445
mqtt_client::MqttClient::check_subscriptions_timer_
rclcpp::TimerBase::SharedPtr check_subscriptions_timer_
Timer to repeatedly check active ROS topics for topics to subscribe.
Definition: MqttClient.ros2.hpp:427
mqtt_client::MqttClient::ClientConfig::key
std::filesystem::path key
client private keyfile
Definition: MqttClient.h:365
mqtt_client::MqttClient::ClientConfig::max_inflight
int max_inflight
maximum number of inflight messages
Definition: MqttClient.h:362
mqtt_client::MqttClient::ClientConfig::buffer
struct mqtt_client::MqttClient::ClientConfig::@1 buffer
client buffer-related variables
mqtt_client::MqttClient::loadParameter
bool loadParameter(const std::string &key, std::string &value)
Loads requested ROS parameter from parameter server.
Definition: MqttClient.cpp:339
mqtt_client::MqttClient::client_config_
ClientConfig client_config_
Client parameters.
Definition: MqttClient.h:455
mqtt_client::MqttClient::BrokerConfig::pass
std::string pass
password
Definition: MqttClient.h:336
mqtt_client::MqttClient::ClientConfig::tls
struct mqtt_client::MqttClient::ClientConfig::@3 tls
SSL/TLS-related variables.
mqtt_client::MqttClient::client_
std::shared_ptr< mqtt::async_client > client_
MQTT client variable.
Definition: MqttClient.h:460
mqtt_client::MqttClient::ClientConfig::message
std::string message
last-will message
Definition: MqttClient.h:356
mqtt_client::MqttClient::ClientConfig::version
int version
TLS version (https://github.com/eclipse/paho.mqtt.cpp/blob/master/src/mqtt/ssl_options....
Definition: MqttClient.h:367
mqtt_client::MqttClient::Mqtt2RosInterface::msg_type
std::string msg_type
message type of publisher
Definition: MqttClient.ros2.hpp:396
mqtt_client::MqttClient::ClientConfig::topic
std::string topic
last-will topic
Definition: MqttClient.h:355
mqtt_client::MqttClient::kRosMsgTypeMqttTopicPrefix
static const std::string kRosMsgTypeMqttTopicPrefix
MQTT topic prefix under which ROS message type information is published.
Definition: MqttClient.h:418
mqtt_client::MqttClient::BrokerConfig::user
std::string user
username
Definition: MqttClient.h:335
mqtt_client::MqttClient::ClientConfig::clean_session
bool clean_session
whether client requests clean session
Definition: MqttClient.h:360
mqtt_client::MqttClient::BrokerConfig::port
int port
broker port
Definition: MqttClient.h:334
mqtt_client::MqttClient::Mqtt2RosInterface::latency_publisher
rclcpp::Publisher< std_msgs::msg::Float64 >::SharedPtr latency_publisher
ROS publisher for latency.
Definition: MqttClient.ros2.hpp:399
mqtt_client::MqttClient::ros2mqtt
void ros2mqtt(const topic_tools::ShapeShifter::ConstPtr &ros_msg, const std::string &ros_topic)
Serializes and publishes a generic ROS message to the MQTT broker.
Definition: MqttClient.cpp:490
mqtt_client::MqttClient::ClientConfig::last_will
struct mqtt_client::MqttClient::ClientConfig::@2 last_will
last-will-related variables
mqtt_client::MqttClient::Mqtt2RosInterface::ros
struct mqtt_client::MqttClient::Mqtt2RosInterface::@7 ros
ROS-related variables.
mqtt_client::MqttClient::ClientConfig::directory
std::filesystem::path directory
client buffer directory
Definition: MqttClient.h:352
mqtt_client::MqttClient::Mqtt2RosInterface::topic
std::string topic
ROS topic.
Definition: MqttClient.h:399
mqtt_client::MqttClient::connect_options_
mqtt::connect_options connect_options_
MQTT client connection options.
Definition: MqttClient.h:465
mqtt_client::MqttClient::Mqtt2RosInterface::qos
int qos
MQTT QoS value.
Definition: MqttClient.h:396
mqtt_client::MqttClient::Ros2MqttInterface::primitive
bool primitive
whether to publish as primitive message
Definition: MqttClient.h:387
mqtt_client::MqttClient::setupSubscriptions
void setupSubscriptions()
Checks all active ROS topics in order to set up generic subscribers.
Definition: MqttClient.ros2.cpp:460
mqtt_client::MqttClient::ClientConfig::alpn_protos
std::vector< std::string > alpn_protos
list of ALPN protocols
Definition: MqttClient.h:370
mqtt_client::MqttClient::message_arrived
void message_arrived(mqtt::const_message_ptr mqtt_msg) override
Callback for when the client receives a MQTT message from the broker.
Definition: MqttClient.cpp:846
mqtt_client::MqttClient::connected
void connected(const std::string &cause) override
Callback for when the client has successfully connected to the broker.
Definition: MqttClient.cpp:801
mqtt_client::MqttClient::isConnectedService
bool isConnectedService(mqtt_client_interfaces::IsConnected::Request &request, mqtt_client_interfaces::IsConnected::Response &response)
ROS service returning whether the client is connected to the broker.
Definition: MqttClient.cpp:837
mqtt_client::MqttClient::broker_config_
BrokerConfig broker_config_
Broker parameters.
Definition: MqttClient.h:450
mqtt_client::MqttClient::kLatencyRosTopicPrefix
static const std::string kLatencyRosTopicPrefix
ROS topic prefix under which ROS2MQTT2ROS latencies are published.
Definition: MqttClient.h:425
mqtt_client::MqttClient::mqtt2ros_
std::map< std::string, Mqtt2RosInterface > mqtt2ros_
MQTT2ROS connection variables sorted by MQTT topic.
Definition: MqttClient.h:475
mqtt_client::MqttClient::ClientConfig::certificate
std::filesystem::path certificate
client certificate
Definition: MqttClient.h:364
mqtt_client::MqttClient::ClientConfig::enabled
bool enabled
whether client buffer is enabled
Definition: MqttClient.h:350
mqtt_client::MqttClient::MqttClient
MqttClient(const rclcpp::NodeOptions &options)
Initializes node.
Definition: MqttClient.ros2.cpp:144
mqtt_client::MqttClient::Mqtt2RosInterface::stamped
bool stamped
whether timestamp is injected
Definition: MqttClient.h:408
mqtt_client::MqttClient::ros2mqtt_
std::map< std::string, Ros2MqttInterface > ros2mqtt_
ROS2MQTT connection variables sorted by ROS topic.
Definition: MqttClient.h:470
mqtt_client::MqttClient::setup
void setup()
Initializes broker connection and subscriptions.
Definition: MqttClient.cpp:384
mqtt_client::MqttClient::connection_lost
void connection_lost(const std::string &cause) override
Callback for when the client has lost connection to the broker.
Definition: MqttClient.cpp:823
mqtt_client::MqttClient::Ros2MqttInterface::stamped
bool stamped
whether to inject timestamp in MQTT message
Definition: MqttClient.h:388
mqtt_client::MqttClient::Ros2MqttInterface::mqtt
struct mqtt_client::MqttClient::Ros2MqttInterface::@5 mqtt
MQTT-related variables.
mqtt_client::deserializeRosMessage
void deserializeRosMessage(const rclcpp::SerializedMessage &serialized_msg, T &msg)
Definition: MqttClient.ros2.hpp:553
mqtt_client::MqttClient::BrokerConfig::ca_certificate
std::filesystem::path ca_certificate
public CA certificate trusted by client
Definition: MqttClient.h:340
mqtt_client::MqttClient::ClientConfig::keep_alive_interval
double keep_alive_interval
keep-alive interval
Definition: MqttClient.h:361
mqtt_client::MqttClient::Ros2MqttInterface::topic
std::string topic
MQTT topic.
Definition: MqttClient.h:383
mqtt_client::MqttClient::Mqtt2RosInterface::latched
bool latched
whether to latch ROS message
Definition: MqttClient.h:404
mqtt_client::MqttClient::Mqtt2RosInterface::queue_size
int queue_size
ROS publisher queue size.
Definition: MqttClient.h:403
mqtt_client::MqttClient::BrokerConfig::host
std::string host
broker host
Definition: MqttClient.h:333
mqtt_client::MqttClient::connect
void connect()
Connects to the broker using the member client and options.
Definition: MqttClient.cpp:472
default_value
def default_value(type_)
mqtt_client::serializeRosMessage
void serializeRosMessage(const T &msg, std::vector< uint8_t > &buffer)
Definition: MqttClient.h:541
mqtt_client::MqttClient::Mqtt2RosInterface::publisher
rclcpp::GenericPublisher::SharedPtr publisher
generic ROS publisher
Definition: MqttClient.ros2.hpp:397
mqtt_client::MqttClient::ClientConfig::id
std::string id
client unique ID
Definition: MqttClient.h:348
mqtt_client::MqttClient::Ros2MqttInterface::queue_size
int queue_size
ROS subscriber queue size.
Definition: MqttClient.h:380
mqtt_client::MqttClient::BrokerConfig::enabled
bool enabled
whether to connect via SSL/TLS
Definition: MqttClient.h:338
mqtt_client::MqttClient::BrokerConfig::tls
struct mqtt_client::MqttClient::BrokerConfig::@0 tls
SSL/TLS-related variables.
mqtt_client::MqttClient::isConnected
bool isConnected()
Returns whether the client is connected to the broker.
Definition: MqttClient.cpp:831
mqtt_client
Namespace for the mqtt_client package.
Definition: MqttClient.h:46
mqtt_client::MqttClient::Mqtt2RosInterface::primitive
bool primitive
Definition: MqttClient.h:406
mqtt_client::MqttClient::BrokerConfig
Struct containing broker parameters.
Definition: MqttClient.h:332
mqtt_client::MqttClient::ClientConfig::retained
bool retained
whether last-will is retained
Definition: MqttClient.h:358
mqtt_client::MqttClient::loadParameters
void loadParameters()
Loads ROS parameters from parameter server.
Definition: MqttClient.cpp:158
mqtt_client::MqttClient::Ros2MqttInterface::retained
bool retained
whether to retain MQTT message
Definition: MqttClient.h:385


mqtt_client
Author(s): Lennart Reiher , Bastian Lampe , Christian Wende
autogenerated on Thu Oct 5 2023 02:09:10