34 #include <mqtt_client_interfaces/RosMsgType.h>
37 #include <std_msgs/Bool.h>
38 #include <std_msgs/Char.h>
39 #include <std_msgs/Float32.h>
40 #include <std_msgs/Float64.h>
41 #include <std_msgs/Int16.h>
42 #include <std_msgs/Int32.h>
43 #include <std_msgs/Int64.h>
44 #include <std_msgs/Int8.h>
45 #include <std_msgs/String.h>
46 #include <std_msgs/UInt16.h>
47 #include <std_msgs/UInt32.h>
48 #include <std_msgs/UInt64.h>
49 #include <std_msgs/UInt8.h>
61 "mqtt_client/ros_msg_type/";
96 std::string& primitive) {
98 bool found_primitive =
true;
99 const std::string& msg_type_md5 = msg->getMD5Sum();
102 primitive = msg->instantiate<std_msgs::String>()->data;
103 }
else if (msg_type_md5 ==
105 primitive = msg->instantiate<std_msgs::Bool>()->data ?
"true" :
"false";
106 }
else if (msg_type_md5 ==
108 primitive = std::to_string(msg->instantiate<std_msgs::Char>()->data);
109 }
else if (msg_type_md5 ==
111 primitive = std::to_string(msg->instantiate<std_msgs::UInt8>()->data);
112 }
else if (msg_type_md5 ==
114 primitive = std::to_string(msg->instantiate<std_msgs::UInt16>()->data);
115 }
else if (msg_type_md5 ==
117 primitive = std::to_string(msg->instantiate<std_msgs::UInt32>()->data);
118 }
else if (msg_type_md5 ==
120 primitive = std::to_string(msg->instantiate<std_msgs::UInt64>()->data);
121 }
else if (msg_type_md5 ==
123 primitive = std::to_string(msg->instantiate<std_msgs::Int8>()->data);
124 }
else if (msg_type_md5 ==
126 primitive = std::to_string(msg->instantiate<std_msgs::Int16>()->data);
127 }
else if (msg_type_md5 ==
129 primitive = std::to_string(msg->instantiate<std_msgs::Int32>()->data);
130 }
else if (msg_type_md5 ==
132 primitive = std::to_string(msg->instantiate<std_msgs::Int64>()->data);
133 }
else if (msg_type_md5 ==
135 primitive = std::to_string(msg->instantiate<std_msgs::Float32>()->data);
136 }
else if (msg_type_md5 ==
138 primitive = std::to_string(msg->instantiate<std_msgs::Float64>()->data);
140 found_primitive =
false;
143 return found_primitive;
161 std::string broker_tls_ca_certificate;
168 loadParameter(
"broker/tls/ca_certificate", broker_tls_ca_certificate,
169 "/etc/ssl/certs/ca-certificates.crt");
173 std::string client_buffer_directory, client_tls_certificate, client_tls_key;
178 loadParameter(
"client/buffer/directory", client_buffer_directory,
"buffer");
180 NODELET_WARN(
"Client buffer can not be enabled when client ID is empty");
194 if (
loadParameter(
"client/tls/certificate", client_tls_certificate)) {
224 for (
int k = 0; k < ros2mqtt_params.
size(); k++) {
226 if (ros2mqtt_params[k].hasMember(
"ros_topic") &&
227 ros2mqtt_params[k].hasMember(
"mqtt_topic")) {
230 std::string& ros_topic = ros2mqtt_params[k][
"ros_topic"];
232 ros2mqtt.mqtt.topic = std::string(ros2mqtt_params[k][
"mqtt_topic"]);
235 if (ros2mqtt_params[k].hasMember(
"primitive"))
236 ros2mqtt.primitive = ros2mqtt_params[k][
"primitive"];
239 if (ros2mqtt_params[k].hasMember(
"inject_timestamp"))
240 ros2mqtt.stamped = ros2mqtt_params[k][
"inject_timestamp"];
243 "Timestamp will not be injected into primitive messages on ROS "
250 if (ros2mqtt_params[k].hasMember(
"advanced")) {
252 ros2mqtt_params[k][
"advanced"];
254 if (advanced_params[
"ros"].hasMember(
"queue_size"))
255 ros2mqtt.ros.queue_size = advanced_params[
"ros"][
"queue_size"];
258 if (advanced_params[
"mqtt"].hasMember(
"qos"))
259 ros2mqtt.mqtt.qos = advanced_params[
"mqtt"][
"qos"];
260 if (advanced_params[
"mqtt"].hasMember(
"retained"))
261 ros2mqtt.mqtt.retained = advanced_params[
"mqtt"][
"retained"];
265 NODELET_INFO(
"Bridging %sROS topic '%s' to MQTT topic '%s' %s",
266 ros2mqtt.primitive ?
"primitive " :
"",
267 ros_topic.c_str(),
ros2mqtt.mqtt.topic.c_str(),
268 ros2mqtt.stamped ?
"and measuring latency" :
"");
271 "Parameter 'bridge/ros2mqtt[%d]' is missing subparameter "
272 "'ros_topic' or 'mqtt_topic', will be ignored",
283 for (
int k = 0; k < mqtt2ros_params.
size(); k++) {
286 if (mqtt2ros_params[k].hasMember(
"mqtt_topic") &&
287 mqtt2ros_params[k].hasMember(
"ros_topic")) {
290 std::string& mqtt_topic = mqtt2ros_params[k][
"mqtt_topic"];
292 mqtt2ros.ros.topic = std::string(mqtt2ros_params[k][
"ros_topic"]);
295 if (mqtt2ros_params[k].hasMember(
"primitive"))
296 mqtt2ros.primitive = mqtt2ros_params[k][
"primitive"];
299 if (mqtt2ros_params[k].hasMember(
"advanced")) {
301 mqtt2ros_params[k][
"advanced"];
303 if (advanced_params[
"mqtt"].hasMember(
"qos"))
304 mqtt2ros.mqtt.qos = advanced_params[
"mqtt"][
"qos"];
307 if (advanced_params[
"ros"].hasMember(
"queue_size"))
308 mqtt2ros.ros.queue_size = advanced_params[
"ros"][
"queue_size"];
309 if (advanced_params[
"ros"].hasMember(
"latched"))
310 mqtt2ros.ros.latched = advanced_params[
"ros"][
"latched"];
315 "Bridging MQTT topic '%s' to %sROS topic '%s'", mqtt_topic.c_str(),
319 "Parameter 'bridge/mqtt2ros[%d]' is missing subparameter "
320 "'mqtt_topic' or 'ros_topic', will be ignored",
327 NODELET_ERROR(
"No valid ROS-MQTT bridge found in parameter 'bridge'");
332 NODELET_ERROR(
"Parameter 'bridge' could not be parsed (XmlRpc %d: %s)",
342 NODELET_DEBUG(
"Retrieved parameter '%s' = '%s'", key.c_str(),
349 const std::string& default_value) {
354 NODELET_ERROR(
"Parameter '%s' has wrong data type", key.c_str());
355 NODELET_WARN(
"Parameter '%s' not set, defaulting to '%s'", key.c_str(),
359 NODELET_DEBUG(
"Retrieved parameter '%s' = '%s'", key.c_str(),
367 std::filesystem::path path(path_string);
368 if (path_string.empty())
return path;
369 if (!path.has_root_path()) {
370 std::string ros_home;
372 if (ros_home.empty())
373 ros_home = std::string(std::filesystem::current_path());
374 path = std::filesystem::path(ros_home);
375 path.append(path_string);
377 if (!std::filesystem::exists(path))
379 std::string(path).c_str());
398 const std::string& ros_topic = ros2mqtt_p.first;
400 const std::string& mqtt_topic =
ros2mqtt.mqtt.topic;
405 NODELET_DEBUG(
"Subscribed ROS topic '%s'", ros_topic.c_str());
426 mqtt::will_options will(
434 mqtt::ssl_options ssl;
455 client_ = std::shared_ptr<mqtt::async_client>(
new mqtt::async_client(
459 client_ = std::shared_ptr<mqtt::async_client>(
462 }
catch (
const mqtt::exception& e) {
463 ROS_ERROR(
"Client could not be initialized: %s", e.what());
474 std::string as_client =
479 client_->get_server_uri().c_str(), as_client.c_str());
483 }
catch (
const mqtt::exception& e) {
484 ROS_ERROR(
"Connection to broker failed: %s", e.what());
491 const std::string& ros_topic) {
494 std::string mqtt_topic =
ros2mqtt.mqtt.topic;
495 std::vector<uint8_t> payload_buffer;
498 mqtt_client_interfaces::RosMsgType ros_msg_type;
499 ros_msg_type.md5 = ros_msg->getMD5Sum();
500 ros_msg_type.name = ros_msg->getDataType();
501 ros_msg_type.definition = ros_msg->getMessageDefinition();
502 ros_msg_type.stamped =
ros2mqtt.stamped;
504 NODELET_DEBUG(
"Received ROS message of type '%s' on topic '%s'",
505 ros_msg_type.name.c_str(), ros_topic.c_str());
512 if (found_primitive) {
513 payload_buffer = std::vector<uint8_t>(payload.begin(), payload.end());
516 "Cannot send ROS message of type '%s' as primitive message, "
517 "check supported primitive types",
518 ros_msg_type.name.c_str());
525 uint32_t msg_type_length =
527 std::vector<uint8_t> msg_type_buffer;
528 msg_type_buffer.resize(msg_type_length);
536 NODELET_DEBUG(
"Sending ROS message type to MQTT broker on topic '%s' ...",
538 mqtt::message_ptr mqtt_msg =
539 mqtt::make_message(mqtt_topic, msg_type_buffer.data(),
540 msg_type_buffer.size(),
ros2mqtt.mqtt.qos,
true);
542 }
catch (
const mqtt::exception& e) {
544 "Publishing ROS message type information to MQTT topic '%s' failed: %s",
545 mqtt_topic.c_str(), e.what());
551 uint32_t payload_length = msg_length;
552 uint32_t stamp_length =
554 uint32_t msg_offset = 0;
557 msg_offset += stamp_length;
558 payload_length += stamp_length;
559 payload_buffer.resize(payload_length);
562 payload_buffer.resize(payload_length);
578 "Injected ROS time 0 into MQTT payload on topic %s, is a ROS clock "
593 "Sending ROS message of type '%s' to MQTT broker on topic '%s' ...",
594 ros_msg->getDataType().c_str(), mqtt_topic.c_str());
595 mqtt::message_ptr mqtt_msg = mqtt::make_message(
596 mqtt_topic, payload_buffer.data(), payload_buffer.size(),
599 }
catch (
const mqtt::exception& e) {
601 "Publishing ROS message type information to MQTT topic '%s' failed: %s",
602 mqtt_topic.c_str(), e.what());
610 std::string mqtt_topic = mqtt_msg->get_topic();
612 auto& payload = mqtt_msg->get_payload_ref();
613 uint32_t payload_length =
static_cast<uint32_t
>(payload.size());
617 uint32_t msg_length = payload_length;
618 uint32_t msg_offset = 0;
624 char* non_const_payload =
const_cast<char*
>(&payload[msg_offset]);
625 uint8_t* stamp_buffer =
reinterpret_cast<uint8_t*
>(non_const_payload);
637 "Cannot compute latency for MQTT topic %s when ROS time is 0, is a ROS "
641 std_msgs::Float64 latency_msg;
642 latency_msg.data = latency.
toSec();
645 if (
mqtt2ros.ros.latency_publisher.getTopic().empty()) {
650 mqtt2ros.ros.latency_publisher.publish(latency_msg);
652 msg_length -= stamp_length;
653 msg_offset += stamp_length;
657 char* non_const_payload =
const_cast<char*
>(&payload[msg_offset]);
658 uint8_t* msg_buffer =
reinterpret_cast<uint8_t*
>(non_const_payload);
663 "Sending ROS message of type '%s' from MQTT broker to ROS topic '%s' ...",
664 mqtt2ros.ros.shape_shifter.getDataType().c_str(),
666 mqtt2ros.ros.shape_shifter.read(msg_stream);
673 std::string mqtt_topic = mqtt_msg->get_topic();
675 const std::string str_msg = mqtt_msg->to_string();
677 bool found_primitive =
false;
678 std::string msg_type_md5;
679 std::string msg_type_name;
680 std::string msg_type_definition;
681 std::vector<uint8_t> msg_buffer;
684 if (!found_primitive) {
685 std::string bool_str = str_msg;
686 std::transform(str_msg.cbegin(), str_msg.cend(), bool_str.begin(),
688 if (bool_str ==
"true" || bool_str ==
"false") {
690 bool bool_msg = (bool_str ==
"true");
700 msg_type_definition =
703 found_primitive =
true;
708 if (!found_primitive) {
711 const int int_msg = std::stoi(str_msg, &pos);
712 if (pos == str_msg.size()) {
722 msg_type_definition =
725 found_primitive =
true;
727 }
catch (
const std::invalid_argument& ex) {
728 }
catch (
const std::out_of_range& ex) {
733 if (!found_primitive) {
736 const float float_msg = std::stof(str_msg, &pos);
737 if (pos == str_msg.size()) {
740 std_msgs::Float32 msg;
741 msg.data = float_msg;
748 msg_type_definition =
751 found_primitive =
true;
753 }
catch (
const std::invalid_argument& ex) {
754 }
catch (
const std::out_of_range& ex) {
759 if (!found_primitive) {
762 std_msgs::String msg;
769 msg_type_definition =
774 if (msg_type_md5 !=
mqtt2ros.ros.shape_shifter.getMD5Sum()) {
777 mqtt2ros.ros.shape_shifter.morph(msg_type_md5, msg_type_name,
778 msg_type_definition,
"");
786 NODELET_INFO(
"ROS publisher message type on topic '%s' set to '%s'",
787 mqtt2ros.ros.topic.c_str(), msg_type_name.c_str());
793 "Sending ROS message of type '%s' from MQTT broker to ROS topic '%s' ...",
794 mqtt2ros.ros.shape_shifter.getDataType().c_str(),
796 mqtt2ros.ros.shape_shifter.read(msg_stream);
804 std::string as_client =
809 client_->get_server_uri().c_str(), as_client.c_str());
814 std::string mqtt_topic = mqtt2ros_p.first;
818 NODELET_DEBUG(
"Subscribed MQTT topic '%s'", mqtt_topic.c_str());
825 NODELET_ERROR(
"Connection to broker lost, will try to reconnect...");
838 mqtt_client_interfaces::IsConnected::Request& request,
839 mqtt_client_interfaces::IsConnected::Response& response) {
851 std::string mqtt_topic = mqtt_msg->get_topic();
852 NODELET_DEBUG(
"Received MQTT message on topic '%s'", mqtt_topic.c_str());
864 bool msg_contains_ros_msg_type =
866 if (msg_contains_ros_msg_type) {
869 auto& payload = mqtt_msg->get_payload_ref();
870 uint32_t payload_length =
static_cast<uint32_t
>(payload.size());
871 char* non_const_payload =
const_cast<char*
>(&payload[0]);
872 uint8_t* msg_type_buffer =
reinterpret_cast<uint8_t*
>(non_const_payload);
875 mqtt_client_interfaces::RosMsgType ros_msg_type;
882 "Failed to deserialize ROS message type from MQTT message received on "
883 "topic '%s', got message:\n%s",
884 mqtt_topic.c_str(), mqtt_msg->to_string().c_str());
889 std::string mqtt_data_topic = mqtt_topic;
895 if (ros_msg_type.md5 !=
mqtt2ros.ros.shape_shifter.getMD5Sum()) {
897 mqtt2ros.stamped = ros_msg_type.stamped;
898 NODELET_INFO(
"ROS publisher message type on topic '%s' set to '%s'",
899 mqtt2ros.ros.topic.c_str(), ros_msg_type.name.c_str());
902 mqtt2ros.ros.shape_shifter.morph(ros_msg_type.md5, ros_msg_type.name,
903 ros_msg_type.definition,
"");
913 NODELET_DEBUG(
"Subscribed MQTT topic '%s'", mqtt_data_topic.c_str());
919 if (!
mqtt2ros_[mqtt_topic].
ros.publisher.getTopic().empty()) {
923 "ROS publisher for data from MQTT topic '%s' is not yet initialized: "
924 "ROS message type not yet known",
943 "Connection to broker failed (return code %d), will automatically "
945 token.get_return_code());