34 #include <mqtt_client_interfaces/msg/ros_msg_type.hpp>
35 #include <rcpputils/env.hpp>
36 #include <std_msgs/msg/bool.hpp>
37 #include <std_msgs/msg/char.hpp>
38 #include <std_msgs/msg/float32.hpp>
39 #include <std_msgs/msg/float64.hpp>
40 #include <std_msgs/msg/int16.hpp>
41 #include <std_msgs/msg/int32.hpp>
42 #include <std_msgs/msg/int64.hpp>
43 #include <std_msgs/msg/int8.hpp>
44 #include <std_msgs/msg/string.hpp>
45 #include <std_msgs/msg/u_int16.hpp>
46 #include <std_msgs/msg/u_int32.hpp>
47 #include <std_msgs/msg/u_int64.hpp>
48 #include <std_msgs/msg/u_int8.hpp>
50 #include <rclcpp_components/register_node_macro.hpp>
58 "mqtt_client/ros_msg_type/";
79 const std::shared_ptr<rclcpp::SerializedMessage>& serialized_msg,
80 const std::string& msg_type, std::string& primitive) {
82 bool found_primitive =
true;
84 if (
msg_type ==
"std_msgs/msg/String") {
85 std_msgs::msg::String msg;
88 }
else if (
msg_type ==
"std_msgs/msg/Bool") {
89 std_msgs::msg::Bool msg;
91 primitive = msg.data ?
"true" :
"false";
92 }
else if (
msg_type ==
"std_msgs/msg/Char") {
93 std_msgs::msg::Char msg;
95 primitive = std::to_string(msg.data);
96 }
else if (
msg_type ==
"std_msgs/msg/UInt8") {
97 std_msgs::msg::UInt8 msg;
99 primitive = std::to_string(msg.data);
100 }
else if (
msg_type ==
"std_msgs/msg/UInt16") {
101 std_msgs::msg::UInt16 msg;
103 primitive = std::to_string(msg.data);
104 }
else if (
msg_type ==
"std_msgs/msg/UInt32") {
105 std_msgs::msg::UInt32 msg;
107 primitive = std::to_string(msg.data);
108 }
else if (
msg_type ==
"std_msgs/msg/UInt64") {
109 std_msgs::msg::UInt64 msg;
111 primitive = std::to_string(msg.data);
112 }
else if (
msg_type ==
"std_msgs/msg/Int8") {
113 std_msgs::msg::Int8 msg;
115 primitive = std::to_string(msg.data);
116 }
else if (
msg_type ==
"std_msgs/msg/Int16") {
117 std_msgs::msg::Int16 msg;
119 primitive = std::to_string(msg.data);
120 }
else if (
msg_type ==
"std_msgs/msg/Int32") {
121 std_msgs::msg::Int32 msg;
123 primitive = std::to_string(msg.data);
124 }
else if (
msg_type ==
"std_msgs/msg/Int64") {
125 std_msgs::msg::Int64 msg;
127 primitive = std::to_string(msg.data);
128 }
else if (
msg_type ==
"std_msgs/msg/Float32") {
129 std_msgs::msg::Float32 msg;
131 primitive = std::to_string(msg.data);
132 }
else if (
msg_type ==
"std_msgs/msg/Float64") {
133 std_msgs::msg::Float64 msg;
135 primitive = std::to_string(msg.data);
137 found_primitive =
false;
140 return found_primitive;
153 rcl_interfaces::msg::ParameterDescriptor param_desc;
155 param_desc.description =
"IP address or hostname of the machine running the MQTT broker";
156 declare_parameter(
"broker.host", rclcpp::ParameterType::PARAMETER_STRING, param_desc);
157 param_desc.description =
"port the MQTT broker is listening on";
158 declare_parameter(
"broker.port", rclcpp::ParameterType::PARAMETER_INTEGER, param_desc);
159 param_desc.description =
"username used for authenticating to the broker (if empty, will try to connect anonymously)";
160 declare_parameter(
"broker.user", rclcpp::ParameterType::PARAMETER_STRING, param_desc);
161 param_desc.description =
"password used for authenticating to the broker";
162 declare_parameter(
"broker.pass", rclcpp::ParameterType::PARAMETER_STRING, param_desc);
163 param_desc.description =
"whether to connect via SSL/TLS";
164 declare_parameter(
"broker.tls.enabled", rclcpp::ParameterType::PARAMETER_BOOL, param_desc);
165 param_desc.description =
"CA certificate file trusted by client (relative to ROS_HOME)";
166 declare_parameter(
"broker.tls.ca_certificate", rclcpp::ParameterType::PARAMETER_STRING, param_desc);
168 param_desc.description =
"unique ID used to identify the client (broker may allow empty ID and automatically generate one)";
169 declare_parameter(
"client.id", rclcpp::ParameterType::PARAMETER_STRING, param_desc);
170 param_desc.description =
"maximum number of messages buffered by the bridge when not connected to broker (only available if client ID is not empty)";
171 declare_parameter(
"client.buffer.size", rclcpp::ParameterType::PARAMETER_INTEGER, param_desc);
172 param_desc.description =
"directory used to buffer messages when not connected to broker (relative to ROS_HOME)";
173 declare_parameter(
"client.buffer.directory", rclcpp::ParameterType::PARAMETER_STRING, param_desc);
174 param_desc.description =
"topic used for this client's last-will message (no last will, if not specified)";
175 declare_parameter(
"client.last_will.topic", rclcpp::ParameterType::PARAMETER_STRING, param_desc);
176 param_desc.description =
"last-will message";
177 declare_parameter(
"client.last_will.message", rclcpp::ParameterType::PARAMETER_STRING, param_desc);
178 param_desc.description =
"QoS value for last-will message";
179 declare_parameter(
"client.last_will.qos", rclcpp::ParameterType::PARAMETER_INTEGER, param_desc);
180 param_desc.description =
"whether to retain last-will message";
181 declare_parameter(
"client.last_will.retained", rclcpp::ParameterType::PARAMETER_BOOL, param_desc);
182 param_desc.description =
"whether to use a clean session for this client";
183 declare_parameter(
"client.clean_session", rclcpp::ParameterType::PARAMETER_BOOL, param_desc);
184 param_desc.description =
"keep-alive interval in seconds";
185 declare_parameter(
"client.keep_alive_interval", rclcpp::ParameterType::PARAMETER_DOUBLE, param_desc);
186 param_desc.description =
"maximum number of inflight messages";
187 declare_parameter(
"client.max_inflight", rclcpp::ParameterType::PARAMETER_INTEGER, param_desc);
188 param_desc.description =
"client certificate file (only needed if broker requires client certificates; relative to ROS_HOME)";
189 declare_parameter(
"client.tls.certificate", rclcpp::ParameterType::PARAMETER_STRING, param_desc);
190 param_desc.description =
"client private key file (relative to ROS_HOME)";
191 declare_parameter(
"client.tls.key", rclcpp::ParameterType::PARAMETER_STRING, param_desc);
192 param_desc.description =
"client private key password";
193 declare_parameter(
"client.tls.password", rclcpp::ParameterType::PARAMETER_STRING, param_desc);
195 param_desc.description =
"The list of topics to bridge from ROS to MQTT";
196 const auto ros2mqtt_ros_topics = declare_parameter<std::vector<std::string>>(
"bridge.ros2mqtt.ros_topics", std::vector<std::string>(), param_desc);
197 for (
const auto& ros_topic : ros2mqtt_ros_topics)
199 param_desc.description =
"MQTT topic on which the corresponding ROS messages are sent to the broker";
200 declare_parameter(fmt::format(
"bridge.ros2mqtt.{}.mqtt_topic", ros_topic), rclcpp::ParameterType::PARAMETER_STRING, param_desc);
201 param_desc.description =
"whether to publish as primitive message";
202 declare_parameter(fmt::format(
"bridge.ros2mqtt.{}.primitive", ros_topic), rclcpp::ParameterType::PARAMETER_BOOL, param_desc);
203 param_desc.description =
"whether to attach a timestamp to a ROS2MQTT payload (for latency computation on receiver side)";
204 declare_parameter(fmt::format(
"bridge.ros2mqtt.{}.inject_timestamp", ros_topic), rclcpp::ParameterType::PARAMETER_BOOL, param_desc);
205 param_desc.description =
"ROS subscriber queue size";
206 declare_parameter(fmt::format(
"bridge.ros2mqtt.{}.advanced.ros.queue_size", ros_topic), rclcpp::ParameterType::PARAMETER_INTEGER, param_desc);
207 param_desc.description =
"MQTT QoS value";
208 declare_parameter(fmt::format(
"bridge.ros2mqtt.{}.advanced.mqtt.qos", ros_topic), rclcpp::ParameterType::PARAMETER_INTEGER, param_desc);
209 param_desc.description =
"whether to retain MQTT message";
210 declare_parameter(fmt::format(
"bridge.ros2mqtt.{}.advanced.mqtt.retained", ros_topic), rclcpp::ParameterType::PARAMETER_BOOL, param_desc);
213 param_desc.description =
"The list of topics to bridge from MQTT to ROS";
214 const auto mqtt2ros_mqtt_topics = declare_parameter<std::vector<std::string>>(
"bridge.mqtt2ros.mqtt_topics", std::vector<std::string>(), param_desc);
215 for (
const auto& mqtt_topic : mqtt2ros_mqtt_topics)
217 param_desc.description =
"ROS topic on which corresponding MQTT messages are published";
218 declare_parameter(fmt::format(
"bridge.mqtt2ros.{}.ros_topic", mqtt_topic), rclcpp::ParameterType::PARAMETER_STRING, param_desc);
219 param_desc.description =
"whether to publish as primitive message (if coming from non-ROS MQTT client)";
220 declare_parameter(fmt::format(
"bridge.mqtt2ros.{}.primitive", mqtt_topic), rclcpp::ParameterType::PARAMETER_BOOL, param_desc);
221 param_desc.description =
"MQTT QoS value";
222 declare_parameter(fmt::format(
"bridge.mqtt2ros.{}.advanced.mqtt.qos", mqtt_topic), rclcpp::ParameterType::PARAMETER_INTEGER, param_desc);
223 param_desc.description =
"ROS publisher queue size";
224 declare_parameter(fmt::format(
"bridge.mqtt2ros.{}.advanced.ros.queue_size", mqtt_topic), rclcpp::ParameterType::PARAMETER_INTEGER, param_desc);
225 param_desc.description =
"whether to latch ROS message";
226 declare_parameter(fmt::format(
"bridge.mqtt2ros.{}.advanced.ros.latched", mqtt_topic), rclcpp::ParameterType::PARAMETER_BOOL, param_desc);
230 std::string broker_tls_ca_certificate;
237 loadParameter(
"broker.tls.ca_certificate", broker_tls_ca_certificate,
238 "/etc/ssl/certs/ca-certificates.crt");
242 std::string client_buffer_directory, client_tls_certificate, client_tls_key;
247 loadParameter(
"client.buffer.directory", client_buffer_directory,
"buffer");
249 RCLCPP_WARN(get_logger(),
250 "Client buffer can not be enabled when client ID is empty");
264 if (
loadParameter(
"client.tls.certificate", client_tls_certificate)) {
282 for (
const auto& ros_topic : ros2mqtt_ros_topics) {
284 rclcpp::Parameter mqtt_topic_param;
285 if (get_parameter(fmt::format(
"bridge.ros2mqtt.{}.mqtt_topic", ros_topic), mqtt_topic_param)) {
288 const std::string mqtt_topic = mqtt_topic_param.as_string();
293 rclcpp::Parameter primitive_param;
294 if (get_parameter(fmt::format(
"bridge.ros2mqtt.{}.primitive", ros_topic), primitive_param))
295 ros2mqtt.primitive = primitive_param.as_bool();
298 rclcpp::Parameter stamped_param;
299 if (get_parameter(fmt::format(
"bridge.ros2mqtt.{}.inject_timestamp", ros_topic), stamped_param))
300 ros2mqtt.stamped = stamped_param.as_bool();
304 "Timestamp will not be injected into primitive messages on ROS "
311 rclcpp::Parameter queue_size_param;
312 if (get_parameter(fmt::format(
"bridge.ros2mqtt.{}.advanced.ros.queue_size", ros_topic),
314 ros2mqtt.ros.queue_size = queue_size_param.as_int();
317 rclcpp::Parameter qos_param;
318 if (get_parameter(fmt::format(
"bridge.ros2mqtt.{}.advanced.mqtt.qos", ros_topic), qos_param))
319 ros2mqtt.mqtt.qos = qos_param.as_int();
322 rclcpp::Parameter retained_param;
323 if (get_parameter(fmt::format(
"bridge.ros2mqtt.{}.advanced.mqtt.retained", ros_topic), retained_param))
324 ros2mqtt.mqtt.retained = retained_param.as_bool();
326 RCLCPP_INFO(get_logger(),
"Bridging %sROS topic '%s' to MQTT topic '%s' %s",
327 ros2mqtt.primitive ?
"primitive " :
"", ros_topic.c_str(),
329 ros2mqtt.stamped ?
"and measuring latency" :
"");
331 RCLCPP_WARN(get_logger(),
332 fmt::format(
"Parameter 'bridge.ros2mqtt.{}' is missing subparameter "
333 "'mqtt_topic', will be ignored", ros_topic).c_str());
338 for (
const auto& mqtt_topic : mqtt2ros_mqtt_topics) {
340 rclcpp::Parameter ros_topic_param;
341 if (get_parameter(fmt::format(
"bridge.mqtt2ros.{}.ros_topic", mqtt_topic), ros_topic_param)) {
344 const std::string ros_topic = ros_topic_param.as_string();
349 rclcpp::Parameter primitive_param;
350 if (get_parameter(fmt::format(
"bridge.mqtt2ros.{}.primitive", mqtt_topic), primitive_param))
351 mqtt2ros.primitive = primitive_param.as_bool();
354 rclcpp::Parameter qos_param;
355 if (get_parameter(fmt::format(
"bridge.mqtt2ros.{}.advanced.mqtt.qos", mqtt_topic), qos_param))
356 mqtt2ros.mqtt.qos = qos_param.as_int();
359 rclcpp::Parameter queue_size_param;
360 if (get_parameter(fmt::format(
"bridge.mqtt2ros.{}.advanced.ros.queue_size", mqtt_topic),
362 mqtt2ros.ros.queue_size = queue_size_param.as_int();
365 rclcpp::Parameter latched_param;
366 if (get_parameter(fmt::format(
"bridge.mqtt2ros.{}.advanced.ros.latched", mqtt_topic), latched_param)) {
367 mqtt2ros.ros.latched = latched_param.as_bool();
368 RCLCPP_WARN(get_logger(),
369 fmt::format(
"Parameter 'bridge.mqtt2ros.{}.advanced.ros.latched' is ignored "
370 "since ROS 2 does not easily support latched topics.", mqtt_topic).c_str());
373 RCLCPP_INFO(get_logger(),
"Bridging MQTT topic '%s' to %sROS topic '%s'",
374 mqtt_topic.c_str(),
mqtt2ros.primitive ?
"primitive " :
"",
378 RCLCPP_WARN(get_logger(),
379 fmt::format(
"Parameter 'bridge.ros2mqtt.{}' is missing subparameter "
380 "'ros_topic', will be ignored", mqtt_topic).c_str());
385 RCLCPP_ERROR(get_logger(),
386 "No valid ROS-MQTT bridge found in parameter 'bridge'");
393 bool found = get_parameter(key, value);
395 RCLCPP_DEBUG(get_logger(),
"Retrieved parameter '%s' = '%s'", key.c_str(),
402 const std::string& default_value) {
403 bool found = get_parameter_or(key, value, default_value);
405 RCLCPP_WARN(get_logger(),
"Parameter '%s' not set, defaulting to '%s'",
408 RCLCPP_DEBUG(get_logger(),
"Retrieved parameter '%s' = '%s'", key.c_str(),
416 std::filesystem::path path(path_string);
417 if (path_string.empty())
return path;
418 if (!path.has_root_path()) {
419 std::string ros_home = rcpputils::get_env_var(
"ROS_HOME");
420 if (ros_home.empty())
421 ros_home = std::string(std::filesystem::current_path());
422 path = std::filesystem::path(ros_home);
423 path.append(path_string);
425 if (!std::filesystem::exists(path))
426 RCLCPP_WARN(get_logger(),
"Requested path '%s' does not exist",
427 std::string(path).c_str());
435 builtin_interfaces::msg::Time tmp_stamp;
436 rclcpp::SerializedMessage tmp_serialized_stamp;
448 create_service<mqtt_client_interfaces::srv::IsConnected>(
450 std::placeholders::_1, std::placeholders::_2));
455 create_wall_timer(std::chrono::duration<double>(1.0),
463 const auto all_topics_and_types = get_topic_names_and_types();
467 if (all_topics_and_types.count(ros_topic)) {
470 const std::string&
msg_type = all_topics_and_types.at(ros_topic)[0];
475 std::function<void(
const std::shared_ptr<rclcpp::SerializedMessage> msg)>
477 std::placeholders::_1, ros_topic);
479 ros2mqtt.ros.subscriber = create_generic_subscription(
481 }
catch (rclcpp::exceptions::RCLError& e) {
482 RCLCPP_ERROR(get_logger(),
"Failed to create generic subscriber: %s",
486 RCLCPP_INFO(get_logger(),
"Subscribed ROS topic '%s' of type '%s'",
487 ros_topic.c_str(),
msg_type.c_str());
509 mqtt::will_options will(
517 mqtt::ssl_options ssl;
538 client_ = std::shared_ptr<mqtt::async_client>(
new mqtt::async_client(
542 client_ = std::shared_ptr<mqtt::async_client>(
545 }
catch (
const mqtt::exception& e) {
546 RCLCPP_ERROR(get_logger(),
"Client could not be initialized: %s", e.what());
557 std::string as_client =
561 RCLCPP_INFO(get_logger(),
"Connecting to broker at '%s'%s ...",
562 client_->get_server_uri().c_str(), as_client.c_str());
566 }
catch (
const mqtt::exception& e) {
567 RCLCPP_ERROR(get_logger(),
"Connection to broker failed: %s", e.what());
574 const std::shared_ptr<rclcpp::SerializedMessage>& serialized_msg,
575 const std::string& ros_topic) {
578 std::string mqtt_topic =
ros2mqtt.mqtt.topic;
579 std::vector<uint8_t> payload_buffer;
582 mqtt_client_interfaces::msg::RosMsgType ros_msg_type;
583 ros_msg_type.name =
ros2mqtt.ros.msg_type;
584 ros_msg_type.stamped =
ros2mqtt.stamped;
586 RCLCPP_DEBUG(get_logger(),
"Received ROS message of type '%s' on topic '%s'",
587 ros_msg_type.name.c_str(), ros_topic.c_str());
593 bool found_primitive =
595 if (found_primitive) {
596 payload_buffer = std::vector<uint8_t>(payload.begin(), payload.end());
598 RCLCPP_WARN(get_logger(),
599 "Cannot send ROS message of type '%s' as primitive message, "
600 "check supported primitive types",
601 ros_msg_type.name.c_str());
608 rclcpp::SerializedMessage serialized_ros_msg_type;
610 uint32_t msg_type_length = serialized_ros_msg_type.size();
611 std::vector<uint8_t> msg_type_buffer = std::vector<uint8_t>(
612 serialized_ros_msg_type.get_rcl_serialized_message().buffer,
613 serialized_ros_msg_type.get_rcl_serialized_message().buffer + msg_type_length);
618 RCLCPP_DEBUG(get_logger(),
619 "Sending ROS message type to MQTT broker on topic '%s' ...",
621 mqtt::message_ptr mqtt_msg =
622 mqtt::make_message(mqtt_topic, msg_type_buffer.data(),
623 msg_type_buffer.size(),
ros2mqtt.mqtt.qos,
true);
625 }
catch (
const mqtt::exception& e) {
628 "Publishing ROS message type information to MQTT topic '%s' failed: %s",
629 mqtt_topic.c_str(), e.what());
634 uint32_t msg_length = serialized_msg->size();
635 uint32_t payload_length = msg_length;
636 uint32_t msg_offset = 0;
642 payload_buffer.resize(payload_length);
645 std::copy(serialized_msg->get_rcl_serialized_message().buffer,
646 serialized_msg->get_rcl_serialized_message().buffer + msg_length,
647 payload_buffer.begin() + msg_offset);
651 payload_buffer = std::vector<uint8_t>(
652 serialized_msg->get_rcl_serialized_message().buffer,
653 serialized_msg->get_rcl_serialized_message().buffer + msg_length);
660 builtin_interfaces::msg::Time stamp(rclcpp::Clock(RCL_SYSTEM_TIME).now());
663 rclcpp::SerializedMessage serialized_stamp;
666 serialized_stamp.get_rcl_serialized_message().buffer,
667 serialized_stamp.get_rcl_serialized_message().buffer +
stamp_length_,
668 payload_buffer.begin());
677 "Sending ROS message of type '%s' to MQTT broker on topic '%s' ...",
678 ros_msg_type.name.c_str(), mqtt_topic.c_str());
679 mqtt::message_ptr mqtt_msg = mqtt::make_message(
680 mqtt_topic, payload_buffer.data(), payload_buffer.size(),
683 }
catch (
const mqtt::exception& e) {
686 "Publishing ROS message type information to MQTT topic '%s' failed: %s",
687 mqtt_topic.c_str(), e.what());
693 const rclcpp::Time& arrival_stamp) {
695 std::string mqtt_topic = mqtt_msg->get_topic();
697 auto& payload = mqtt_msg->get_payload_ref();
698 uint32_t payload_length =
static_cast<uint32_t
>(payload.size());
702 uint32_t msg_length = payload_length;
703 uint32_t msg_offset = 0;
710 std::memcpy(serialized_stamp.get_rcl_serialized_message().buffer,
712 serialized_stamp.get_rcl_serialized_message().buffer_length =
stamp_length_;
715 builtin_interfaces::msg::Time stamp;
719 rclcpp::Duration latency = arrival_stamp - stamp;
720 std_msgs::msg::Float64 latency_msg;
721 latency_msg.data = latency.seconds();
724 if (!
mqtt2ros.ros.latency_publisher) {
726 latency_topic.replace(latency_topic.find(
"//"), 2,
"/");
728 create_publisher<std_msgs::msg::Float64>(latency_topic, 1);
730 mqtt2ros.ros.latency_publisher->publish(latency_msg);
737 rclcpp::SerializedMessage serialized_msg(msg_length);
738 std::memcpy(serialized_msg.get_rcl_serialized_message().buffer,
739 &(payload[msg_offset]), msg_length);
740 serialized_msg.get_rcl_serialized_message().buffer_length = msg_length;
745 "Sending ROS message of type '%s' from MQTT broker to ROS topic '%s' ...",
747 mqtt2ros.ros.publisher->publish(serialized_msg);
753 std::string mqtt_topic = mqtt_msg->get_topic();
755 const std::string str_msg = mqtt_msg->to_string();
757 bool found_primitive =
false;
758 std::string ros_msg_type =
"std_msgs/msg/String";
759 rclcpp::SerializedMessage serialized_msg;
762 if (!found_primitive) {
763 std::string bool_str = str_msg;
764 std::transform(str_msg.cbegin(), str_msg.cend(), bool_str.begin(),
766 if (bool_str ==
"true" || bool_str ==
"false") {
768 bool bool_msg = (bool_str ==
"true");
771 std_msgs::msg::Bool msg;
775 ros_msg_type =
"std_msgs/msg/Bool";
776 found_primitive =
true;
781 if (!found_primitive) {
784 const int int_msg = std::stoi(str_msg, &pos);
785 if (pos == str_msg.size()) {
788 std_msgs::msg::Int32 msg;
792 ros_msg_type =
"std_msgs/msg/Int32";
793 found_primitive =
true;
795 }
catch (
const std::invalid_argument& ex) {
796 }
catch (
const std::out_of_range& ex) {
801 if (!found_primitive) {
804 const float float_msg = std::stof(str_msg, &pos);
805 if (pos == str_msg.size()) {
808 std_msgs::msg::Float32 msg;
809 msg.data = float_msg;
812 ros_msg_type =
"std_msgs/msg/Float32";
813 found_primitive =
true;
815 }
catch (
const std::invalid_argument& ex) {
816 }
catch (
const std::out_of_range& ex) {
821 if (!found_primitive) {
824 std_msgs::msg::String msg;
830 if (ros_msg_type !=
mqtt2ros.ros.msg_type) {
832 mqtt2ros.ros.msg_type = ros_msg_type;
833 RCLCPP_INFO(get_logger(),
834 "ROS publisher message type on topic '%s' set to '%s'",
835 mqtt2ros.ros.topic.c_str(), ros_msg_type.c_str());
839 mqtt2ros.ros.publisher = create_generic_publisher(
841 }
catch (rclcpp::exceptions::RCLError& e) {
842 RCLCPP_ERROR(get_logger(),
"Failed to create generic publisher: %s",
851 "Sending ROS message of type '%s' from MQTT broker to ROS topic '%s' ...",
853 mqtt2ros.ros.publisher->publish(serialized_msg);
862 std::string as_client =
866 RCLCPP_INFO(get_logger(),
"Connected to broker at '%s'%s",
867 client_->get_server_uri().c_str(), as_client.c_str());
871 std::string mqtt_topic_to_subscribe = mqtt_topic;
875 RCLCPP_INFO(get_logger(),
"Subscribed MQTT topic '%s'", mqtt_topic_to_subscribe.c_str());
884 RCLCPP_ERROR(get_logger(),
885 "Connection to broker lost, will try to reconnect...");
898 mqtt_client_interfaces::srv::IsConnected::Request::SharedPtr request,
899 mqtt_client_interfaces::srv::IsConnected::Response::SharedPtr response) {
909 rclcpp::Time arrival_stamp(
910 builtin_interfaces::msg::Time(rclcpp::Clock(RCL_SYSTEM_TIME).now()));
912 std::string mqtt_topic = mqtt_msg->get_topic();
913 RCLCPP_DEBUG(get_logger(),
"Received MQTT message on topic '%s'",
926 bool msg_contains_ros_msg_type =
928 if (msg_contains_ros_msg_type) {
931 auto& payload = mqtt_msg->get_payload_ref();
932 uint32_t payload_length =
static_cast<uint32_t
>(payload.size());
933 rclcpp::SerializedMessage serialized_ros_msg_type(payload_length);
934 std::memcpy(serialized_ros_msg_type.get_rcl_serialized_message().buffer,
935 &(payload[0]), payload_length);
936 serialized_ros_msg_type.get_rcl_serialized_message().buffer_length = payload_length;
939 mqtt_client_interfaces::msg::RosMsgType ros_msg_type;
943 std::string mqtt_data_topic = mqtt_topic;
949 if (ros_msg_type.name !=
mqtt2ros.ros.msg_type) {
951 mqtt2ros.ros.msg_type = ros_msg_type.name;
952 mqtt2ros.stamped = ros_msg_type.stamped;
953 RCLCPP_INFO(get_logger(),
954 "ROS publisher message type on topic '%s' set to '%s'",
955 mqtt2ros.ros.topic.c_str(), ros_msg_type.name.c_str());
959 mqtt2ros.ros.publisher = create_generic_publisher(
961 }
catch (rclcpp::exceptions::RCLError& e) {
962 RCLCPP_ERROR(get_logger(),
"Failed to create generic publisher: %s",
969 RCLCPP_DEBUG(get_logger(),
"Subscribed MQTT topic '%s'",
970 mqtt_data_topic.c_str());
981 "ROS publisher for data from MQTT topic '%s' is not yet initialized: "
982 "ROS message type not yet known",
1006 "Connection to broker failed (return code %d), will automatically "
1008 token.get_return_code());