MqttClient.h
Go to the documentation of this file.
1 /*
2 ==============================================================================
3 MIT License
4 
5 Copyright 2022 Institute for Automotive Engineering of RWTH Aachen University.
6 
7 Permission is hereby granted, free of charge, to any person obtaining a copy
8 of this software and associated documentation files (the "Software"), to deal
9 in the Software without restriction, including without limitation the rights
10 to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
11 copies of the Software, and to permit persons to whom the Software is
12 furnished to do so, subject to the following conditions:
13 
14 The above copyright notice and this permission notice shall be included in all
15 copies or substantial portions of the Software.
16 
17 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19 FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
20 AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21 LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
22 OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
23 SOFTWARE.
24 ==============================================================================
25 */
26 
27 
28 #pragma once
29 
30 #include <filesystem>
31 #include <map>
32 #include <memory>
33 #include <string>
34 
35 #include <mqtt_client_interfaces/IsConnected.h>
36 #include <mqtt/async_client.h>
37 #include <nodelet/nodelet.h>
38 #include <ros/ros.h>
39 #include <rosfmt/full.h> // fmt::format, fmt::join
41 
42 
46 namespace mqtt_client {
47 
48 
59  public virtual mqtt::callback,
60  public virtual mqtt::iaction_listener {
61 
62  protected:
68  virtual void onInit() override;
69 
73  void loadParameters();
74 
84  bool loadParameter(const std::string& key, std::string& value);
85 
97  bool loadParameter(const std::string& key, std::string& value,
98  const std::string& default_value);
99 
111  template <typename T>
112  bool loadParameter(const std::string& key, T& value);
113 
127  template <typename T>
128  bool loadParameter(const std::string& key, T& value, const T& default_value);
129 
141  template <typename T>
142  bool loadParameter(const std::string& key, std::vector<T>& value);
143 
157  template <typename T>
158  bool loadParameter(const std::string& key, std::vector<T>& value, const std::vector<T>& default_value);
159 
171  std::filesystem::path resolvePath(const std::string& path_string);
172 
176  void setup();
177 
182  void setupClient();
183 
187  void connect();
188 
204  void ros2mqtt(const topic_tools::ShapeShifter::ConstPtr& ros_msg,
205  const std::string& ros_topic);
206 
223  void mqtt2ros(mqtt::const_message_ptr mqtt_msg,
224  const ros::WallTime& arrival_stamp = ros::WallTime::now());
225 
244  void mqtt2primitive(mqtt::const_message_ptr mqtt_msg);
245 
254  void connected(const std::string& cause) override;
255 
263  void connection_lost(const std::string& cause) override;
264 
271  bool isConnected();
272 
282  bool isConnectedService(
283  mqtt_client_interfaces::IsConnected::Request& request,
284  mqtt_client_interfaces::IsConnected::Response& response);
285 
297  void message_arrived(mqtt::const_message_ptr mqtt_msg) override;
298 
306  void delivery_complete(mqtt::delivery_token_ptr token) override;
307 
316  void on_success(const mqtt::token& token) override;
317 
326  void on_failure(const mqtt::token& token) override;
327 
328  protected:
332  struct BrokerConfig {
333  std::string host;
334  int port;
335  std::string user;
336  std::string pass;
337  struct {
338  bool enabled;
339  std::filesystem::path
341  } tls;
342  };
343 
347  struct ClientConfig {
348  std::string id;
349  struct {
350  bool enabled;
351  int size;
352  std::filesystem::path directory;
353  } buffer;
354  struct {
355  std::string topic;
356  std::string message;
357  int qos;
358  bool retained;
359  } last_will;
363  struct {
364  std::filesystem::path certificate;
365  std::filesystem::path key;
366  std::string password;
367  int version;
368  bool verify;
369  std::vector<std::string> alpn_protos;
371  } tls;
372  };
373 
378  struct {
380  int queue_size = 1;
381  } ros;
382  struct {
383  std::string topic;
384  int qos = 0;
385  bool retained = false;
386  } mqtt;
387  bool primitive = false;
388  bool stamped = false;
389  };
390 
395  struct {
396  int qos = 0;
397  } mqtt;
398  struct {
399  std::string topic;
403  int queue_size = 1;
404  bool latched = false;
405  } ros;
406  bool primitive = false;
407  bool stamped = false;
409  };
410 
411  protected:
418  static const std::string kRosMsgTypeMqttTopicPrefix;
419 
425  static const std::string kLatencyRosTopicPrefix;
426 
431 
436 
441 
445  bool is_connected_ = false;
446 
451 
456 
460  std::shared_ptr<mqtt::async_client> client_;
461 
465  mqtt::connect_options connect_options_;
466 
470  std::map<std::string, Ros2MqttInterface> ros2mqtt_;
471 
475  std::map<std::string, Mqtt2RosInterface> mqtt2ros_;
476 };
477 
478 
479 template <typename T>
480 bool MqttClient::loadParameter(const std::string& key, T& value) {
481  bool found = private_node_handle_.getParam(key, value);
482  if (found)
483  NODELET_DEBUG("Retrieved parameter '%s' = '%s'", key.c_str(),
484  std::to_string(value).c_str());
485  return found;
486 }
487 
488 
489 template <typename T>
490 bool MqttClient::loadParameter(const std::string& key, T& value,
491  const T& default_value) {
492  bool found = private_node_handle_.param<T>(key, value, default_value);
493  if (!found) {
495  NODELET_ERROR("Parameter '%s' has wrong data type", key.c_str());
496  NODELET_WARN("Parameter '%s' not set, defaulting to '%s'", key.c_str(),
497  std::to_string(default_value).c_str());
498  }
499  if (found)
500  NODELET_DEBUG("Retrieved parameter '%s' = '%s'", key.c_str(),
501  std::to_string(value).c_str());
502  return found;
503 }
504 
505 
506 template <typename T>
507 bool MqttClient::loadParameter(const std::string& key, std::vector<T>& value)
508 {
509  const bool found = private_node_handle_.getParam(key, value);
510  if (found)
511  NODELET_DEBUG("Retrieved parameter '%s' = '[%s]'", key.c_str(),
512  fmt::format("{}", fmt::join(value, ", ")).c_str());
513  return found;
514 }
515 
516 
517 template <typename T>
518 bool MqttClient::loadParameter(const std::string& key, std::vector<T>& value,
519  const std::vector<T>& default_value)
520 {
521  const bool found = private_node_handle_.param<T>(key, value, default_value);
522  if (!found)
523  NODELET_WARN("Parameter '%s' not set, defaulting to '%s'", key.c_str(),
524  fmt::format("{}", fmt::join(value, ", ")).c_str());
525  if (found)
526  NODELET_DEBUG("Retrieved parameter '%s' = '%s'", key.c_str(),
527  fmt::format("{}", fmt::join(value, ", ")).c_str());
528  return found;
529 }
530 
531 
540 template <typename T>
541 void serializeRosMessage(const T& msg, std::vector<uint8_t>& buffer) {
542 
543  const uint32_t length = ros::serialization::serializationLength(msg);
544  buffer.resize(length);
545  ros::serialization::OStream stream(buffer.data(), length);
546  ros::serialization::serialize(stream, msg);
547 }
548 
549 } // namespace mqtt_client
mqtt_client::MqttClient::ClientConfig::qos
int qos
last-will QoS value
Definition: MqttClient.h:357
mqtt_client::MqttClient::ClientConfig::size
int size
client buffer size
Definition: MqttClient.h:351
response
const std::string response
mqtt_client::MqttClient::mqtt2ros
void mqtt2ros(mqtt::const_message_ptr mqtt_msg, const ros::WallTime &arrival_stamp=ros::WallTime::now())
Publishes a ROS message received via MQTT to ROS.
Definition: MqttClient.cpp:607
mqtt_client::MqttClient::Ros2MqttInterface::ros
struct mqtt_client::MqttClient::Ros2MqttInterface::@4 ros
ROS-related variables.
mqtt_client::MqttClient::ClientConfig::password
std::string password
decryption password for private key
Definition: MqttClient.h:366
ros::serialization::OStream
mqtt_client::MqttClient::setupClient
void setupClient()
Sets up the client connection options and initializes the client object.
Definition: MqttClient.cpp:410
mqtt_client::MqttClient::on_success
void on_success(const mqtt::token &token) override
Callback for when a MQTT action succeeds.
Definition: MqttClient.cpp:934
NODELET_ERROR
#define NODELET_ERROR(...)
mqtt_client::MqttClient::resolvePath
std::filesystem::path resolvePath(const std::string &path_string)
Converts a string to a path object resolving paths relative to ROS_HOME.
Definition: MqttClient.cpp:365
mqtt_client::MqttClient::Ros2MqttInterface::qos
int qos
MQTT QoS value.
Definition: MqttClient.h:384
ros::Publisher
mqtt_client::MqttClient::ClientConfig
Struct containing client parameters.
Definition: MqttClient.h:347
mqtt_client::MqttClient::mqtt2primitive
void mqtt2primitive(mqtt::const_message_ptr mqtt_msg)
Publishes a primitive message received via MQTT to ROS.
Definition: MqttClient.cpp:671
boost::shared_ptr
mqtt_client::MqttClient::Mqtt2RosInterface::publisher
ros::Publisher publisher
generic ROS publisher
Definition: MqttClient.h:400
mqtt_client::MqttClient::on_failure
void on_failure(const mqtt::token &token) override
Callback for when a MQTT action fails.
Definition: MqttClient.cpp:940
mqtt_client::MqttClient::Mqtt2RosInterface::mqtt
struct mqtt_client::MqttClient::Mqtt2RosInterface::@6 mqtt
MQTT-related variables.
mqtt_client::MqttClient::ClientConfig::verify
bool verify
Definition: MqttClient.h:368
ros::NodeHandle::getParam
bool getParam(const std::string &key, bool &b) const
mqtt_client::MqttClient::delivery_complete
void delivery_complete(mqtt::delivery_token_ptr token) override
Callback for when delivery for a MQTT message has been completed.
Definition: MqttClient.cpp:931
ros.h
mqtt_client::MqttClient::is_connected_
bool is_connected_
Status variable keeping track of connection status to broker.
Definition: MqttClient.h:445
mqtt_client::MqttClient::ClientConfig::key
std::filesystem::path key
client private keyfile
Definition: MqttClient.h:365
ros::serialization::serializationLength
uint32_t serializationLength(const boost::array< T, N > &t)
NODELET_WARN
#define NODELET_WARN(...)
mqtt_client::MqttClient::ClientConfig::max_inflight
int max_inflight
maximum number of inflight messages
Definition: MqttClient.h:362
mqtt_client::MqttClient::ClientConfig::buffer
struct mqtt_client::MqttClient::ClientConfig::@1 buffer
client buffer-related variables
mqtt_client::MqttClient::loadParameter
bool loadParameter(const std::string &key, std::string &value)
Loads requested ROS parameter from parameter server.
Definition: MqttClient.cpp:339
mqtt_client::MqttClient::client_config_
ClientConfig client_config_
Client parameters.
Definition: MqttClient.h:455
mqtt_client::MqttClient::BrokerConfig::pass
std::string pass
password
Definition: MqttClient.h:336
mqtt_client::MqttClient::ClientConfig::tls
struct mqtt_client::MqttClient::ClientConfig::@3 tls
SSL/TLS-related variables.
mqtt_client::MqttClient::client_
std::shared_ptr< mqtt::async_client > client_
MQTT client variable.
Definition: MqttClient.h:460
mqtt_client::MqttClient::ClientConfig::message
std::string message
last-will message
Definition: MqttClient.h:356
mqtt_client::MqttClient::ClientConfig::version
int version
TLS version (https://github.com/eclipse/paho.mqtt.cpp/blob/master/src/mqtt/ssl_options....
Definition: MqttClient.h:367
mqtt_client::MqttClient::Mqtt2RosInterface
Struct containing variables related to a MQTT2ROS connection.
Definition: MqttClient.h:394
mqtt_client::MqttClient::ClientConfig::topic
std::string topic
last-will topic
Definition: MqttClient.h:355
mqtt_client::MqttClient::kRosMsgTypeMqttTopicPrefix
static const std::string kRosMsgTypeMqttTopicPrefix
MQTT topic prefix under which ROS message type information is published.
Definition: MqttClient.h:418
mqtt_client::MqttClient::BrokerConfig::user
std::string user
username
Definition: MqttClient.h:335
mqtt_client::MqttClient::ClientConfig::clean_session
bool clean_session
whether client requests clean session
Definition: MqttClient.h:360
ros::ServiceServer
mqtt_client::MqttClient::Mqtt2RosInterface::shape_shifter
topic_tools::ShapeShifter shape_shifter
ROS msg type ShapeShifter.
Definition: MqttClient.h:401
mqtt_client::MqttClient::BrokerConfig::port
int port
broker port
Definition: MqttClient.h:334
mqtt_client::MqttClient::ros2mqtt
void ros2mqtt(const topic_tools::ShapeShifter::ConstPtr &ros_msg, const std::string &ros_topic)
Serializes and publishes a generic ROS message to the MQTT broker.
Definition: MqttClient.cpp:490
mqtt_client::MqttClient::ClientConfig::last_will
struct mqtt_client::MqttClient::ClientConfig::@2 last_will
last-will-related variables
mqtt_client::MqttClient::Mqtt2RosInterface::latency_publisher
ros::Publisher latency_publisher
ROS publisher for latency.
Definition: MqttClient.h:402
mqtt_client::MqttClient::Mqtt2RosInterface::ros
struct mqtt_client::MqttClient::Mqtt2RosInterface::@7 ros
ROS-related variables.
mqtt_client::MqttClient::ClientConfig::directory
std::filesystem::path directory
client buffer directory
Definition: MqttClient.h:352
mqtt_client::MqttClient::Mqtt2RosInterface::topic
std::string topic
ROS topic.
Definition: MqttClient.h:399
mqtt_client::MqttClient::connect_options_
mqtt::connect_options connect_options_
MQTT client connection options.
Definition: MqttClient.h:465
mqtt_client::MqttClient::Ros2MqttInterface
Struct containing variables related to a ROS2MQTT connection.
Definition: MqttClient.h:377
mqtt_client::MqttClient::Mqtt2RosInterface::qos
int qos
MQTT QoS value.
Definition: MqttClient.h:396
ros::WallTime::now
static WallTime now()
mqtt_client::MqttClient::Ros2MqttInterface::primitive
bool primitive
whether to publish as primitive message
Definition: MqttClient.h:387
mqtt_client::MqttClient
ROS Nodelet for sending and receiving ROS messages via MQTT.
Definition: MqttClient.h:58
mqtt_client::MqttClient::private_node_handle_
ros::NodeHandle private_node_handle_
Private ROS node handle.
Definition: MqttClient.h:435
mqtt_client::MqttClient::ClientConfig::alpn_protos
std::vector< std::string > alpn_protos
list of ALPN protocols
Definition: MqttClient.h:370
ros::NodeHandle::hasParam
bool hasParam(const std::string &key) const
mqtt_client::MqttClient::message_arrived
void message_arrived(mqtt::const_message_ptr mqtt_msg) override
Callback for when the client receives a MQTT message from the broker.
Definition: MqttClient.cpp:846
mqtt_client::MqttClient::connected
void connected(const std::string &cause) override
Callback for when the client has successfully connected to the broker.
Definition: MqttClient.cpp:801
mqtt_client::MqttClient::isConnectedService
bool isConnectedService(mqtt_client_interfaces::IsConnected::Request &request, mqtt_client_interfaces::IsConnected::Response &response)
ROS service returning whether the client is connected to the broker.
Definition: MqttClient.cpp:837
mqtt_client::MqttClient::broker_config_
BrokerConfig broker_config_
Broker parameters.
Definition: MqttClient.h:450
ros::WallTime
mqtt_client::MqttClient::kLatencyRosTopicPrefix
static const std::string kLatencyRosTopicPrefix
ROS topic prefix under which ROS2MQTT2ROS latencies are published.
Definition: MqttClient.h:425
mqtt_client::MqttClient::mqtt2ros_
std::map< std::string, Mqtt2RosInterface > mqtt2ros_
MQTT2ROS connection variables sorted by MQTT topic.
Definition: MqttClient.h:475
mqtt_client::MqttClient::ClientConfig::certificate
std::filesystem::path certificate
client certificate
Definition: MqttClient.h:364
mqtt_client::MqttClient::ClientConfig::enabled
bool enabled
whether client buffer is enabled
Definition: MqttClient.h:350
mqtt_client::MqttClient::Mqtt2RosInterface::stamped
bool stamped
whether timestamp is injected
Definition: MqttClient.h:408
full.h
mqtt_client::MqttClient::ros2mqtt_
std::map< std::string, Ros2MqttInterface > ros2mqtt_
ROS2MQTT connection variables sorted by ROS topic.
Definition: MqttClient.h:470
mqtt_client::MqttClient::setup
void setup()
Initializes broker connection and subscriptions.
Definition: MqttClient.cpp:384
mqtt_client::MqttClient::connection_lost
void connection_lost(const std::string &cause) override
Callback for when the client has lost connection to the broker.
Definition: MqttClient.cpp:823
mqtt_client::MqttClient::Ros2MqttInterface::stamped
bool stamped
whether to inject timestamp in MQTT message
Definition: MqttClient.h:388
mqtt_client::MqttClient::is_connected_service_
ros::ServiceServer is_connected_service_
ROS Service server for providing connection status.
Definition: MqttClient.h:440
mqtt_client::MqttClient::Ros2MqttInterface::mqtt
struct mqtt_client::MqttClient::Ros2MqttInterface::@5 mqtt
MQTT-related variables.
shape_shifter.h
mqtt_client::MqttClient::BrokerConfig::ca_certificate
std::filesystem::path ca_certificate
public CA certificate trusted by client
Definition: MqttClient.h:340
nodelet::Nodelet
mqtt_client::MqttClient::ClientConfig::keep_alive_interval
double keep_alive_interval
keep-alive interval
Definition: MqttClient.h:361
mqtt_client::MqttClient::Ros2MqttInterface::topic
std::string topic
MQTT topic.
Definition: MqttClient.h:383
mqtt_client::MqttClient::Mqtt2RosInterface::latched
bool latched
whether to latch ROS message
Definition: MqttClient.h:404
mqtt_client::MqttClient::Mqtt2RosInterface::queue_size
int queue_size
ROS publisher queue size.
Definition: MqttClient.h:403
nodelet.h
mqtt_client::MqttClient::Ros2MqttInterface::subscriber
ros::Subscriber subscriber
generic ROS subscriber
Definition: MqttClient.h:379
mqtt_client::MqttClient::BrokerConfig::host
std::string host
broker host
Definition: MqttClient.h:333
mqtt_client::MqttClient::connect
void connect()
Connects to the broker using the member client and options.
Definition: MqttClient.cpp:472
default_value
def default_value(type_)
mqtt_client::serializeRosMessage
void serializeRosMessage(const T &msg, std::vector< uint8_t > &buffer)
Definition: MqttClient.h:541
ros::NodeHandle::param
T param(const std::string &param_name, const T &default_val) const
mqtt_client::MqttClient::ClientConfig::id
std::string id
client unique ID
Definition: MqttClient.h:348
mqtt_client::MqttClient::Ros2MqttInterface::queue_size
int queue_size
ROS subscriber queue size.
Definition: MqttClient.h:380
mqtt_client::MqttClient::BrokerConfig::enabled
bool enabled
whether to connect via SSL/TLS
Definition: MqttClient.h:338
mqtt_client::MqttClient::BrokerConfig::tls
struct mqtt_client::MqttClient::BrokerConfig::@0 tls
SSL/TLS-related variables.
topic_tools::ShapeShifter
mqtt_client::MqttClient::isConnected
bool isConnected()
Returns whether the client is connected to the broker.
Definition: MqttClient.cpp:831
mqtt_client
Namespace for the mqtt_client package.
Definition: MqttClient.h:46
ros::serialization::serialize
void serialize(Stream &stream, const boost::array< T, N > &t)
mqtt_client::MqttClient::Mqtt2RosInterface::primitive
bool primitive
Definition: MqttClient.h:406
mqtt_client::MqttClient::BrokerConfig
Struct containing broker parameters.
Definition: MqttClient.h:332
mqtt_client::MqttClient::ClientConfig::retained
bool retained
whether last-will is retained
Definition: MqttClient.h:358
mqtt_client::MqttClient::onInit
virtual void onInit() override
Initializes nodelet when nodelet is loaded.
Definition: MqttClient.cpp:147
mqtt_client::MqttClient::loadParameters
void loadParameters()
Loads ROS parameters from parameter server.
Definition: MqttClient.cpp:158
mqtt_client::MqttClient::node_handle_
ros::NodeHandle node_handle_
ROS node handle.
Definition: MqttClient.h:430
mqtt_client::MqttClient::Ros2MqttInterface::retained
bool retained
whether to retain MQTT message
Definition: MqttClient.h:385
ros::NodeHandle
ros::Subscriber
NODELET_DEBUG
#define NODELET_DEBUG(...)


mqtt_client
Author(s): Lennart Reiher , Bastian Lampe , Christian Wende
autogenerated on Thu Oct 5 2023 02:09:10