MqttClient.cpp
Go to the documentation of this file.
1 /*
2 ==============================================================================
3 MIT License
4 
5 Copyright 2022 Institute for Automotive Engineering of RWTH Aachen University.
6 
7 Permission is hereby granted, free of charge, to any person obtaining a copy
8 of this software and associated documentation files (the "Software"), to deal
9 in the Software without restriction, including without limitation the rights
10 to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
11 copies of the Software, and to permit persons to whom the Software is
12 furnished to do so, subject to the following conditions:
13 
14 The above copyright notice and this permission notice shall be included in all
15 copies or substantial portions of the Software.
16 
17 THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
18 IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
19 FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
20 AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
21 LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
22 OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
23 SOFTWARE.
24 ==============================================================================
25 */
26 
27 
28 #include <algorithm>
29 #include <cstdint>
30 #include <cstring>
31 #include <vector>
32 
33 #include <mqtt_client/MqttClient.h>
34 #include <mqtt_client_interfaces/RosMsgType.h>
36 #include <ros/message_traits.h>
37 #include <std_msgs/Bool.h>
38 #include <std_msgs/Char.h>
39 #include <std_msgs/Float32.h>
40 #include <std_msgs/Float64.h>
41 #include <std_msgs/Int16.h>
42 #include <std_msgs/Int32.h>
43 #include <std_msgs/Int64.h>
44 #include <std_msgs/Int8.h>
45 #include <std_msgs/String.h>
46 #include <std_msgs/UInt16.h>
47 #include <std_msgs/UInt32.h>
48 #include <std_msgs/UInt64.h>
49 #include <std_msgs/UInt8.h>
51 #include <xmlrpcpp/XmlRpcValue.h>
52 
53 
55 
56 
57 namespace mqtt_client {
58 
59 
61  "mqtt_client/ros_msg_type/";
62 
63 const std::string MqttClient::kLatencyRosTopicPrefix = "latencies/";
64 
65 
96  std::string& primitive) {
97 
98  bool found_primitive = true;
99  const std::string& msg_type_md5 = msg->getMD5Sum();
100 
102  primitive = msg->instantiate<std_msgs::String>()->data;
103  } else if (msg_type_md5 ==
105  primitive = msg->instantiate<std_msgs::Bool>()->data ? "true" : "false";
106  } else if (msg_type_md5 ==
108  primitive = std::to_string(msg->instantiate<std_msgs::Char>()->data);
109  } else if (msg_type_md5 ==
111  primitive = std::to_string(msg->instantiate<std_msgs::UInt8>()->data);
112  } else if (msg_type_md5 ==
114  primitive = std::to_string(msg->instantiate<std_msgs::UInt16>()->data);
115  } else if (msg_type_md5 ==
117  primitive = std::to_string(msg->instantiate<std_msgs::UInt32>()->data);
118  } else if (msg_type_md5 ==
120  primitive = std::to_string(msg->instantiate<std_msgs::UInt64>()->data);
121  } else if (msg_type_md5 ==
123  primitive = std::to_string(msg->instantiate<std_msgs::Int8>()->data);
124  } else if (msg_type_md5 ==
126  primitive = std::to_string(msg->instantiate<std_msgs::Int16>()->data);
127  } else if (msg_type_md5 ==
129  primitive = std::to_string(msg->instantiate<std_msgs::Int32>()->data);
130  } else if (msg_type_md5 ==
132  primitive = std::to_string(msg->instantiate<std_msgs::Int64>()->data);
133  } else if (msg_type_md5 ==
135  primitive = std::to_string(msg->instantiate<std_msgs::Float32>()->data);
136  } else if (msg_type_md5 ==
138  primitive = std::to_string(msg->instantiate<std_msgs::Float64>()->data);
139  } else {
140  found_primitive = false;
141  }
142 
143  return found_primitive;
144 }
145 
146 
148 
149  // get nodehandles
150  node_handle_ = this->getMTNodeHandle();
152 
153  loadParameters();
154  setup();
155 }
156 
157 
159 
160  // load broker parameters from parameter server
161  std::string broker_tls_ca_certificate;
162  loadParameter("broker/host", broker_config_.host, "localhost");
163  loadParameter("broker/port", broker_config_.port, 1883);
164  if (loadParameter("broker/user", broker_config_.user)) {
165  loadParameter("broker/pass", broker_config_.pass, "");
166  }
167  if (loadParameter("broker/tls/enabled", broker_config_.tls.enabled, false)) {
168  loadParameter("broker/tls/ca_certificate", broker_tls_ca_certificate,
169  "/etc/ssl/certs/ca-certificates.crt");
170  }
171 
172  // load client parameters from parameter server
173  std::string client_buffer_directory, client_tls_certificate, client_tls_key;
174  loadParameter("client/id", client_config_.id, "");
177  loadParameter("client/buffer/size", client_config_.buffer.size, 0);
178  loadParameter("client/buffer/directory", client_buffer_directory, "buffer");
179  } else {
180  NODELET_WARN("Client buffer can not be enabled when client ID is empty");
181  }
182  if (loadParameter("client/last_will/topic", client_config_.last_will.topic)) {
183  loadParameter("client/last_will/message", client_config_.last_will.message,
184  "offline");
185  loadParameter("client/last_will/qos", client_config_.last_will.qos, 0);
186  loadParameter("client/last_will/retained",
188  }
189  loadParameter("client/clean_session", client_config_.clean_session, true);
190  loadParameter("client/keep_alive_interval",
192  loadParameter("client/max_inflight", client_config_.max_inflight, 65535);
193  if (broker_config_.tls.enabled) {
194  if (loadParameter("client/tls/certificate", client_tls_certificate)) {
195  loadParameter("client/tls/key", client_tls_key);
196  loadParameter("client/tls/password", client_config_.tls.password);
197  loadParameter("client/tls/version", client_config_.tls.version);
198  loadParameter("client/tls/verify", client_config_.tls.verify);
199  loadParameter("client/tls/alpn_protos", client_config_.tls.alpn_protos);
200  }
201  }
202 
203  // resolve filepaths
204  broker_config_.tls.ca_certificate = resolvePath(broker_tls_ca_certificate);
205  client_config_.buffer.directory = resolvePath(client_buffer_directory);
206  client_config_.tls.certificate = resolvePath(client_tls_certificate);
207  client_config_.tls.key = resolvePath(client_tls_key);
208 
209  // load bridge parameters from parameter server
210  XmlRpc::XmlRpcValue bridge;
211  if (!private_node_handle_.getParam("bridge", bridge)) {
212  NODELET_ERROR("Parameter 'bridge' is required");
213  exit(EXIT_FAILURE);
214  }
215 
216  // parse bridge parameters
217  try {
218 
219  // ros2mqtt
220  if (bridge.hasMember("ros2mqtt")) {
221 
222  // loop over array
223  XmlRpc::XmlRpcValue& ros2mqtt_params = bridge["ros2mqtt"];
224  for (int k = 0; k < ros2mqtt_params.size(); k++) {
225 
226  if (ros2mqtt_params[k].hasMember("ros_topic") &&
227  ros2mqtt_params[k].hasMember("mqtt_topic")) {
228 
229  // ros2mqtt[k]/ros_topic and ros2mqtt[k]/mqtt_topic
230  std::string& ros_topic = ros2mqtt_params[k]["ros_topic"];
231  Ros2MqttInterface& ros2mqtt = ros2mqtt_[ros_topic];
232  ros2mqtt.mqtt.topic = std::string(ros2mqtt_params[k]["mqtt_topic"]);
233 
234  // ros2mqtt[k]/primitive
235  if (ros2mqtt_params[k].hasMember("primitive"))
236  ros2mqtt.primitive = ros2mqtt_params[k]["primitive"];
237 
238  // ros2mqtt[k]/inject_timestamp
239  if (ros2mqtt_params[k].hasMember("inject_timestamp"))
240  ros2mqtt.stamped = ros2mqtt_params[k]["inject_timestamp"];
241  if (ros2mqtt.stamped && ros2mqtt.primitive) {
242  NODELET_WARN(
243  "Timestamp will not be injected into primitive messages on ROS "
244  "topic '%s'",
245  ros_topic.c_str());
246  ros2mqtt.stamped = false;
247  }
248 
249  // ros2mqtt[k]/advanced
250  if (ros2mqtt_params[k].hasMember("advanced")) {
251  XmlRpc::XmlRpcValue& advanced_params =
252  ros2mqtt_params[k]["advanced"];
253  if (advanced_params.hasMember("ros")) {
254  if (advanced_params["ros"].hasMember("queue_size"))
255  ros2mqtt.ros.queue_size = advanced_params["ros"]["queue_size"];
256  }
257  if (advanced_params.hasMember("mqtt")) {
258  if (advanced_params["mqtt"].hasMember("qos"))
259  ros2mqtt.mqtt.qos = advanced_params["mqtt"]["qos"];
260  if (advanced_params["mqtt"].hasMember("retained"))
261  ros2mqtt.mqtt.retained = advanced_params["mqtt"]["retained"];
262  }
263  }
264 
265  NODELET_INFO("Bridging %sROS topic '%s' to MQTT topic '%s' %s",
266  ros2mqtt.primitive ? "primitive " : "",
267  ros_topic.c_str(), ros2mqtt.mqtt.topic.c_str(),
268  ros2mqtt.stamped ? "and measuring latency" : "");
269  } else {
270  NODELET_WARN(
271  "Parameter 'bridge/ros2mqtt[%d]' is missing subparameter "
272  "'ros_topic' or 'mqtt_topic', will be ignored",
273  k);
274  }
275  }
276  }
277 
278  // mqtt2ros
279  if (bridge.hasMember("mqtt2ros")) {
280 
281  // loop over array
282  XmlRpc::XmlRpcValue& mqtt2ros_params = bridge["mqtt2ros"];
283  for (int k = 0; k < mqtt2ros_params.size(); k++) {
284 
285 
286  if (mqtt2ros_params[k].hasMember("mqtt_topic") &&
287  mqtt2ros_params[k].hasMember("ros_topic")) {
288 
289  // mqtt2ros[k]/mqtt_topic and mqtt2ros[k]/ros_topic
290  std::string& mqtt_topic = mqtt2ros_params[k]["mqtt_topic"];
291  Mqtt2RosInterface& mqtt2ros = mqtt2ros_[mqtt_topic];
292  mqtt2ros.ros.topic = std::string(mqtt2ros_params[k]["ros_topic"]);
293 
294  // mqtt2ros[k]/primitive
295  if (mqtt2ros_params[k].hasMember("primitive"))
296  mqtt2ros.primitive = mqtt2ros_params[k]["primitive"];
297 
298  // mqtt2ros[k]/advanced
299  if (mqtt2ros_params[k].hasMember("advanced")) {
300  XmlRpc::XmlRpcValue& advanced_params =
301  mqtt2ros_params[k]["advanced"];
302  if (advanced_params.hasMember("mqtt")) {
303  if (advanced_params["mqtt"].hasMember("qos"))
304  mqtt2ros.mqtt.qos = advanced_params["mqtt"]["qos"];
305  }
306  if (advanced_params.hasMember("ros")) {
307  if (advanced_params["ros"].hasMember("queue_size"))
308  mqtt2ros.ros.queue_size = advanced_params["ros"]["queue_size"];
309  if (advanced_params["ros"].hasMember("latched"))
310  mqtt2ros.ros.latched = advanced_params["ros"]["latched"];
311  }
312  }
313 
314  NODELET_INFO(
315  "Bridging MQTT topic '%s' to %sROS topic '%s'", mqtt_topic.c_str(),
316  mqtt2ros.primitive ? "primitive " : "", mqtt2ros.ros.topic.c_str());
317  } else {
318  NODELET_WARN(
319  "Parameter 'bridge/mqtt2ros[%d]' is missing subparameter "
320  "'mqtt_topic' or 'ros_topic', will be ignored",
321  k);
322  }
323  }
324  }
325 
326  if (ros2mqtt_.empty() && mqtt2ros_.empty()) {
327  NODELET_ERROR("No valid ROS-MQTT bridge found in parameter 'bridge'");
328  exit(EXIT_FAILURE);
329  }
330 
331  } catch (XmlRpc::XmlRpcException e) {
332  NODELET_ERROR("Parameter 'bridge' could not be parsed (XmlRpc %d: %s)",
333  e.getCode(), e.getMessage().c_str());
334  exit(EXIT_FAILURE);
335  }
336 }
337 
338 
339 bool MqttClient::loadParameter(const std::string& key, std::string& value) {
340  bool found = private_node_handle_.getParam(key, value);
341  if (found)
342  NODELET_DEBUG("Retrieved parameter '%s' = '%s'", key.c_str(),
343  value.c_str());
344  return found;
345 }
346 
347 
348 bool MqttClient::loadParameter(const std::string& key, std::string& value,
349  const std::string& default_value) {
350  bool found =
351  private_node_handle_.param<std::string>(key, value, default_value);
352  if (!found) {
354  NODELET_ERROR("Parameter '%s' has wrong data type", key.c_str());
355  NODELET_WARN("Parameter '%s' not set, defaulting to '%s'", key.c_str(),
356  default_value.c_str());
357  }
358  if (found)
359  NODELET_DEBUG("Retrieved parameter '%s' = '%s'", key.c_str(),
360  value.c_str());
361  return found;
362 }
363 
364 
365 std::filesystem::path MqttClient::resolvePath(const std::string& path_string) {
366 
367  std::filesystem::path path(path_string);
368  if (path_string.empty()) return path;
369  if (!path.has_root_path()) {
370  std::string ros_home;
371  ros::get_environment_variable(ros_home, "ROS_HOME");
372  if (ros_home.empty())
373  ros_home = std::string(std::filesystem::current_path());
374  path = std::filesystem::path(ros_home);
375  path.append(path_string);
376  }
377  if (!std::filesystem::exists(path))
378  NODELET_WARN("Requested path '%s' does not exist",
379  std::string(path).c_str());
380  return path;
381 }
382 
383 
385 
386  // initialize MQTT client
387  setupClient();
388 
389  // connect to MQTT broker
390  connect();
391 
392  // create ROS service server
394  "is_connected", &MqttClient::isConnectedService, this);
395 
396  // create ROS subscribers
397  for (auto& ros2mqtt_p : ros2mqtt_) {
398  const std::string& ros_topic = ros2mqtt_p.first;
399  Ros2MqttInterface& ros2mqtt = ros2mqtt_p.second;
400  const std::string& mqtt_topic = ros2mqtt.mqtt.topic;
401  ros2mqtt.ros.subscriber =
403  ros_topic, ros2mqtt.ros.queue_size,
404  boost::bind(&MqttClient::ros2mqtt, this, _1, ros_topic));
405  NODELET_DEBUG("Subscribed ROS topic '%s'", ros_topic.c_str());
406  }
407 }
408 
409 
411 
412  // basic client connection options
413  connect_options_.set_automatic_reconnect(true);
414  connect_options_.set_clean_session(client_config_.clean_session);
415  connect_options_.set_keep_alive_interval(client_config_.keep_alive_interval);
416  connect_options_.set_max_inflight(client_config_.max_inflight);
417 
418  // user authentication
419  if (!broker_config_.user.empty()) {
420  connect_options_.set_user_name(broker_config_.user);
421  connect_options_.set_password(broker_config_.pass);
422  }
423 
424  // last will
425  if (!client_config_.last_will.topic.empty()) {
426  mqtt::will_options will(
429  connect_options_.set_will(will);
430  }
431 
432  // SSL/TLS
433  if (broker_config_.tls.enabled) {
434  mqtt::ssl_options ssl;
435  ssl.set_trust_store(broker_config_.tls.ca_certificate);
436  if (!client_config_.tls.certificate.empty() &&
437  !client_config_.tls.key.empty()) {
438  ssl.set_key_store(client_config_.tls.certificate);
439  ssl.set_private_key(client_config_.tls.key);
440  if (!client_config_.tls.password.empty())
441  ssl.set_private_key_password(client_config_.tls.password);
442  }
443  ssl.set_ssl_version(client_config_.tls.version);
444  ssl.set_verify(client_config_.tls.verify);
445  ssl.set_alpn_protos(client_config_.tls.alpn_protos);
446  connect_options_.set_ssl(ssl);
447  }
448 
449  // create MQTT client
450  const std::string protocol = broker_config_.tls.enabled ? "ssl" : "tcp";
451  const std::string uri = fmt::format("{}://{}:{}", protocol, broker_config_.host,
453  try {
455  client_ = std::shared_ptr<mqtt::async_client>(new mqtt::async_client(
458  } else {
459  client_ = std::shared_ptr<mqtt::async_client>(
460  new mqtt::async_client(uri, client_config_.id));
461  }
462  } catch (const mqtt::exception& e) {
463  ROS_ERROR("Client could not be initialized: %s", e.what());
464  exit(EXIT_FAILURE);
465  }
466 
467  // setup MQTT callbacks
468  client_->set_callback(*this);
469 }
470 
471 
473 
474  std::string as_client =
475  client_config_.id.empty()
476  ? ""
477  : std::string(" as '") + client_config_.id + std::string("'");
478  NODELET_INFO("Connecting to broker at '%s'%s ...",
479  client_->get_server_uri().c_str(), as_client.c_str());
480 
481  try {
482  client_->connect(connect_options_, nullptr, *this);
483  } catch (const mqtt::exception& e) {
484  ROS_ERROR("Connection to broker failed: %s", e.what());
485  exit(EXIT_FAILURE);
486  }
487 }
488 
489 
491  const std::string& ros_topic) {
492 
493  Ros2MqttInterface& ros2mqtt = ros2mqtt_[ros_topic];
494  std::string mqtt_topic = ros2mqtt.mqtt.topic;
495  std::vector<uint8_t> payload_buffer;
496 
497  // gather information on ROS message type in special ROS message
498  mqtt_client_interfaces::RosMsgType ros_msg_type;
499  ros_msg_type.md5 = ros_msg->getMD5Sum();
500  ros_msg_type.name = ros_msg->getDataType();
501  ros_msg_type.definition = ros_msg->getMessageDefinition();
502  ros_msg_type.stamped = ros2mqtt.stamped;
503 
504  NODELET_DEBUG("Received ROS message of type '%s' on topic '%s'",
505  ros_msg_type.name.c_str(), ros_topic.c_str());
506 
507  if (ros2mqtt.primitive) { // publish as primitive (string) message
508 
509  // resolve ROS messages to primitive strings if possible
510  std::string payload;
511  bool found_primitive = primitiveRosMessageToString(ros_msg, payload);
512  if (found_primitive) {
513  payload_buffer = std::vector<uint8_t>(payload.begin(), payload.end());
514  } else {
515  NODELET_WARN(
516  "Cannot send ROS message of type '%s' as primitive message, "
517  "check supported primitive types",
518  ros_msg_type.name.c_str());
519  return;
520  }
521 
522  } else { // publish as complete ROS message incl. ROS message type
523 
524  // serialize ROS message type to buffer
525  uint32_t msg_type_length =
527  std::vector<uint8_t> msg_type_buffer;
528  msg_type_buffer.resize(msg_type_length);
529  ros::serialization::OStream msg_type_stream(msg_type_buffer.data(),
530  msg_type_length);
531  ros::serialization::serialize(msg_type_stream, ros_msg_type);
532 
533  // send ROS message type information to MQTT broker
534  mqtt_topic = kRosMsgTypeMqttTopicPrefix + ros2mqtt.mqtt.topic;
535  try {
536  NODELET_DEBUG("Sending ROS message type to MQTT broker on topic '%s' ...",
537  mqtt_topic.c_str());
538  mqtt::message_ptr mqtt_msg =
539  mqtt::make_message(mqtt_topic, msg_type_buffer.data(),
540  msg_type_buffer.size(), ros2mqtt.mqtt.qos, true);
541  client_->publish(mqtt_msg);
542  } catch (const mqtt::exception& e) {
543  NODELET_WARN(
544  "Publishing ROS message type information to MQTT topic '%s' failed: %s",
545  mqtt_topic.c_str(), e.what());
546  }
547 
548  // build MQTT payload for ROS message (R) as [R]
549  // or [S, R] if timestamp (S) is included
550  uint32_t msg_length = ros::serialization::serializationLength(*ros_msg);
551  uint32_t payload_length = msg_length;
552  uint32_t stamp_length =
554  uint32_t msg_offset = 0;
555  if (ros2mqtt.stamped) {
556  // allocate buffer with appropriate size to hold [S, R]
557  msg_offset += stamp_length;
558  payload_length += stamp_length;
559  payload_buffer.resize(payload_length);
560  } else {
561  // allocate buffer with appropriate size to hold [R]
562  payload_buffer.resize(payload_length);
563  }
564 
565  // serialize ROS message to payload [-, R]
566  ros::serialization::OStream msg_stream(&payload_buffer[msg_offset],
567  msg_length);
568  ros::serialization::serialize(msg_stream, *ros_msg);
569 
570  // inject timestamp as final step
571  if (ros2mqtt.stamped) {
572 
573  // take current timestamp
574  ros::WallTime stamp_wall = ros::WallTime::now();
575  ros::Time stamp(stamp_wall.sec, stamp_wall.nsec);
576  if (stamp.isZero())
577  NODELET_WARN(
578  "Injected ROS time 0 into MQTT payload on topic %s, is a ROS clock "
579  "running?",
580  ros2mqtt.mqtt.topic.c_str());
581 
582  // serialize timestamp to payload [S, R]
583  ros::serialization::OStream stamp_stream(&payload_buffer[0],
584  stamp_length);
585  ros::serialization::serialize(stamp_stream, stamp);
586  }
587  }
588 
589  // send ROS message to MQTT broker
590  mqtt_topic = ros2mqtt.mqtt.topic;
591  try {
593  "Sending ROS message of type '%s' to MQTT broker on topic '%s' ...",
594  ros_msg->getDataType().c_str(), mqtt_topic.c_str());
595  mqtt::message_ptr mqtt_msg = mqtt::make_message(
596  mqtt_topic, payload_buffer.data(), payload_buffer.size(),
597  ros2mqtt.mqtt.qos, ros2mqtt.mqtt.retained);
598  client_->publish(mqtt_msg);
599  } catch (const mqtt::exception& e) {
600  NODELET_WARN(
601  "Publishing ROS message type information to MQTT topic '%s' failed: %s",
602  mqtt_topic.c_str(), e.what());
603  }
604 }
605 
606 
607 void MqttClient::mqtt2ros(mqtt::const_message_ptr mqtt_msg,
608  const ros::WallTime& arrival_stamp) {
609 
610  std::string mqtt_topic = mqtt_msg->get_topic();
611  Mqtt2RosInterface& mqtt2ros = mqtt2ros_[mqtt_topic];
612  auto& payload = mqtt_msg->get_payload_ref();
613  uint32_t payload_length = static_cast<uint32_t>(payload.size());
614 
615  // read MQTT payload for ROS message (R) as [R]
616  // or [S, R] if timestamp (S) is included
617  uint32_t msg_length = payload_length;
618  uint32_t msg_offset = 0;
619 
620  // if stamped, compute latency
621  if (mqtt2ros.stamped) {
622 
623  // create ROS message buffer on top of MQTT message payload
624  char* non_const_payload = const_cast<char*>(&payload[msg_offset]);
625  uint8_t* stamp_buffer = reinterpret_cast<uint8_t*>(non_const_payload);
626 
627  // deserialize stamp
628  ros::Time stamp;
629  uint32_t stamp_length = ros::serialization::serializationLength(stamp);
630  ros::serialization::IStream stamp_stream(stamp_buffer, stamp_length);
631  ros::serialization::deserialize(stamp_stream, stamp);
632 
633  // compute ROS2MQTT latency
634  ros::Time now(arrival_stamp.sec, arrival_stamp.nsec);
635  if (now.isZero())
636  NODELET_WARN(
637  "Cannot compute latency for MQTT topic %s when ROS time is 0, is a ROS "
638  "clock running?",
639  mqtt_topic.c_str());
640  ros::Duration latency = now - stamp;
641  std_msgs::Float64 latency_msg;
642  latency_msg.data = latency.toSec();
643 
644  // publish latency
645  if (mqtt2ros.ros.latency_publisher.getTopic().empty()) {
646  std::string latency_topic = kLatencyRosTopicPrefix + mqtt2ros.ros.topic;
647  mqtt2ros.ros.latency_publisher =
648  private_node_handle_.advertise<std_msgs::Float64>(latency_topic, 1);
649  }
650  mqtt2ros.ros.latency_publisher.publish(latency_msg);
651 
652  msg_length -= stamp_length;
653  msg_offset += stamp_length;
654  }
655 
656  // create ROS message buffer on top of MQTT message payload
657  char* non_const_payload = const_cast<char*>(&payload[msg_offset]);
658  uint8_t* msg_buffer = reinterpret_cast<uint8_t*>(non_const_payload);
659  ros::serialization::OStream msg_stream(msg_buffer, msg_length);
660 
661  // publish via ShapeShifter
663  "Sending ROS message of type '%s' from MQTT broker to ROS topic '%s' ...",
664  mqtt2ros.ros.shape_shifter.getDataType().c_str(),
665  mqtt2ros.ros.topic.c_str());
666  mqtt2ros.ros.shape_shifter.read(msg_stream);
667  mqtt2ros.ros.publisher.publish(mqtt2ros.ros.shape_shifter);
668 }
669 
670 
671 void MqttClient::mqtt2primitive(mqtt::const_message_ptr mqtt_msg) {
672 
673  std::string mqtt_topic = mqtt_msg->get_topic();
674  Mqtt2RosInterface& mqtt2ros = mqtt2ros_[mqtt_topic];
675  const std::string str_msg = mqtt_msg->to_string();
676 
677  bool found_primitive = false;
678  std::string msg_type_md5;
679  std::string msg_type_name;
680  std::string msg_type_definition;
681  std::vector<uint8_t> msg_buffer;
682 
683  // check for bool
684  if (!found_primitive) {
685  std::string bool_str = str_msg;
686  std::transform(str_msg.cbegin(), str_msg.cend(), bool_str.begin(),
687  ::tolower);
688  if (bool_str == "true" || bool_str == "false") {
689 
690  bool bool_msg = (bool_str == "true");
691 
692  // construct and serialize ROS message
693  std_msgs::Bool msg;
694  msg.data = bool_msg;
695  serializeRosMessage(msg, msg_buffer);
696 
697  // collect ROS message type information
700  msg_type_definition =
702 
703  found_primitive = true;
704  }
705  }
706 
707  // check for int
708  if (!found_primitive) {
709  std::size_t pos;
710  try {
711  const int int_msg = std::stoi(str_msg, &pos);
712  if (pos == str_msg.size()) {
713 
714  // construct and serialize ROS message
715  std_msgs::Int32 msg;
716  msg.data = int_msg;
717  serializeRosMessage(msg, msg_buffer);
718 
719  // collect ROS message type information
722  msg_type_definition =
724 
725  found_primitive = true;
726  }
727  } catch (const std::invalid_argument& ex) {
728  } catch (const std::out_of_range& ex) {
729  }
730  }
731 
732  // check for float
733  if (!found_primitive) {
734  std::size_t pos;
735  try {
736  const float float_msg = std::stof(str_msg, &pos);
737  if (pos == str_msg.size()) {
738 
739  // construct and serialize ROS message
740  std_msgs::Float32 msg;
741  msg.data = float_msg;
742  serializeRosMessage(msg, msg_buffer);
743 
744  // collect ROS message type information
746  msg_type_name =
748  msg_type_definition =
750 
751  found_primitive = true;
752  }
753  } catch (const std::invalid_argument& ex) {
754  } catch (const std::out_of_range& ex) {
755  }
756  }
757 
758  // fall back to string
759  if (!found_primitive) {
760 
761  // construct and serialize ROS message
762  std_msgs::String msg;
763  msg.data = str_msg;
764  serializeRosMessage(msg, msg_buffer);
765 
766  // collect ROS message type information
769  msg_type_definition =
771  }
772 
773  // if ROS message type has changed
774  if (msg_type_md5 != mqtt2ros.ros.shape_shifter.getMD5Sum()) {
775 
776  // configure ShapeShifter
777  mqtt2ros.ros.shape_shifter.morph(msg_type_md5, msg_type_name,
778  msg_type_definition, "");
779 
780  // advertise with ROS publisher
781  mqtt2ros.ros.publisher.shutdown();
782  mqtt2ros.ros.publisher = mqtt2ros.ros.shape_shifter.advertise(
783  node_handle_, mqtt2ros.ros.topic, mqtt2ros.ros.queue_size,
784  mqtt2ros.ros.latched);
785 
786  NODELET_INFO("ROS publisher message type on topic '%s' set to '%s'",
787  mqtt2ros.ros.topic.c_str(), msg_type_name.c_str());
788  }
789 
790  // publish via ShapeShifter
791  ros::serialization::OStream msg_stream(msg_buffer.data(), msg_buffer.size());
793  "Sending ROS message of type '%s' from MQTT broker to ROS topic '%s' ...",
794  mqtt2ros.ros.shape_shifter.getDataType().c_str(),
795  mqtt2ros.ros.topic.c_str());
796  mqtt2ros.ros.shape_shifter.read(msg_stream);
797  mqtt2ros.ros.publisher.publish(mqtt2ros.ros.shape_shifter);
798 }
799 
800 
801 void MqttClient::connected(const std::string& cause) {
802 
803  is_connected_ = true;
804  std::string as_client =
805  client_config_.id.empty()
806  ? ""
807  : std::string(" as '") + client_config_.id + std::string("'");
808  NODELET_INFO("Connected to broker at '%s'%s",
809  client_->get_server_uri().c_str(), as_client.c_str());
810 
811  // subscribe MQTT topics
812  for (auto& mqtt2ros_p : mqtt2ros_) {
813  Mqtt2RosInterface& mqtt2ros = mqtt2ros_p.second;
814  std::string mqtt_topic = mqtt2ros_p.first;
815  if (!mqtt2ros.primitive) // subscribe topics for ROS message types first
816  mqtt_topic = kRosMsgTypeMqttTopicPrefix + mqtt_topic;
817  client_->subscribe(mqtt_topic, mqtt2ros.mqtt.qos);
818  NODELET_DEBUG("Subscribed MQTT topic '%s'", mqtt_topic.c_str());
819  }
820 }
821 
822 
823 void MqttClient::connection_lost(const std::string& cause) {
824 
825  NODELET_ERROR("Connection to broker lost, will try to reconnect...");
826  is_connected_ = false;
827  connect();
828 }
829 
830 
832 
833  return is_connected_;
834 }
835 
836 
838  mqtt_client_interfaces::IsConnected::Request& request,
839  mqtt_client_interfaces::IsConnected::Response& response) {
840 
841  response.connected = isConnected();
842  return true;
843 }
844 
845 
846 void MqttClient::message_arrived(mqtt::const_message_ptr mqtt_msg) {
847 
848  // instantly take arrival timestamp
849  ros::WallTime arrival_stamp = ros::WallTime::now();
850 
851  std::string mqtt_topic = mqtt_msg->get_topic();
852  NODELET_DEBUG("Received MQTT message on topic '%s'", mqtt_topic.c_str());
853 
854  // publish directly if primitive
855  if (mqtt2ros_.count(mqtt_topic) > 0) {
856  Mqtt2RosInterface& mqtt2ros = mqtt2ros_[mqtt_topic];
857  if (mqtt2ros.primitive) {
858  mqtt2primitive(mqtt_msg);
859  return;
860  }
861  }
862 
863  // else first check for ROS message type
864  bool msg_contains_ros_msg_type =
865  mqtt_topic.find(kRosMsgTypeMqttTopicPrefix) != std::string::npos;
866  if (msg_contains_ros_msg_type) {
867 
868  // create ROS message buffer on top of MQTT message payload
869  auto& payload = mqtt_msg->get_payload_ref();
870  uint32_t payload_length = static_cast<uint32_t>(payload.size());
871  char* non_const_payload = const_cast<char*>(&payload[0]);
872  uint8_t* msg_type_buffer = reinterpret_cast<uint8_t*>(non_const_payload);
873 
874  // deserialize ROS message type
875  mqtt_client_interfaces::RosMsgType ros_msg_type;
876  ros::serialization::IStream msg_type_stream(msg_type_buffer,
877  payload_length);
878  try {
879  ros::serialization::deserialize(msg_type_stream, ros_msg_type);
882  "Failed to deserialize ROS message type from MQTT message received on "
883  "topic '%s', got message:\n%s",
884  mqtt_topic.c_str(), mqtt_msg->to_string().c_str());
885  return;
886  }
887 
888  // reconstruct corresponding MQTT data topic
889  std::string mqtt_data_topic = mqtt_topic;
890  mqtt_data_topic.erase(mqtt_data_topic.find(kRosMsgTypeMqttTopicPrefix),
891  kRosMsgTypeMqttTopicPrefix.length());
892  Mqtt2RosInterface& mqtt2ros = mqtt2ros_[mqtt_data_topic];
893 
894  // if ROS message type has changed
895  if (ros_msg_type.md5 != mqtt2ros.ros.shape_shifter.getMD5Sum()) {
896 
897  mqtt2ros.stamped = ros_msg_type.stamped;
898  NODELET_INFO("ROS publisher message type on topic '%s' set to '%s'",
899  mqtt2ros.ros.topic.c_str(), ros_msg_type.name.c_str());
900 
901  // configure ShapeShifter
902  mqtt2ros.ros.shape_shifter.morph(ros_msg_type.md5, ros_msg_type.name,
903  ros_msg_type.definition, "");
904 
905  // advertise with ROS publisher
906  mqtt2ros.ros.publisher.shutdown();
907  mqtt2ros.ros.publisher = mqtt2ros.ros.shape_shifter.advertise(
908  node_handle_, mqtt2ros.ros.topic, mqtt2ros.ros.queue_size,
909  mqtt2ros.ros.latched);
910 
911  // subscribe to MQTT topic with actual ROS messages
912  client_->subscribe(mqtt_data_topic, mqtt2ros.mqtt.qos);
913  NODELET_DEBUG("Subscribed MQTT topic '%s'", mqtt_data_topic.c_str());
914  }
915 
916  } else {
917 
918  // publish ROS message, if publisher initialized
919  if (!mqtt2ros_[mqtt_topic].ros.publisher.getTopic().empty()) {
920  mqtt2ros(mqtt_msg, arrival_stamp);
921  } else {
922  NODELET_WARN(
923  "ROS publisher for data from MQTT topic '%s' is not yet initialized: "
924  "ROS message type not yet known",
925  mqtt_topic.c_str());
926  }
927  }
928 }
929 
930 
931 void MqttClient::delivery_complete(mqtt::delivery_token_ptr token) {}
932 
933 
934 void MqttClient::on_success(const mqtt::token& token) {
935 
936  is_connected_ = true;
937 }
938 
939 
940 void MqttClient::on_failure(const mqtt::token& token) {
941 
942  ROS_ERROR(
943  "Connection to broker failed (return code %d), will automatically "
944  "retry...",
945  token.get_return_code());
946  is_connected_ = false;
947 }
948 
949 } // namespace mqtt_client
XmlRpc::XmlRpcValue::size
int size() const
mqtt_client::MqttClient::ClientConfig::qos
int qos
last-will QoS value
Definition: MqttClient.h:357
mqtt_client::MqttClient::ClientConfig::size
int size
client buffer size
Definition: MqttClient.h:351
response
const std::string response
mqtt_client::MqttClient::mqtt2ros
void mqtt2ros(mqtt::const_message_ptr mqtt_msg, const ros::WallTime &arrival_stamp=ros::WallTime::now())
Publishes a ROS message received via MQTT to ROS.
Definition: MqttClient.cpp:607
mqtt_client::MqttClient::ClientConfig::password
std::string password
decryption password for private key
Definition: MqttClient.h:366
ros::serialization::OStream
mqtt_client::MqttClient::setupClient
void setupClient()
Sets up the client connection options and initializes the client object.
Definition: MqttClient.cpp:410
mqtt_client::MqttClient::on_success
void on_success(const mqtt::token &token) override
Callback for when a MQTT action succeeds.
Definition: MqttClient.cpp:934
NODELET_ERROR
#define NODELET_ERROR(...)
mqtt_client::MqttClient::resolvePath
std::filesystem::path resolvePath(const std::string &path_string)
Converts a string to a path object resolving paths relative to ROS_HOME.
Definition: MqttClient.cpp:365
ros::serialization::deserialize
void deserialize(Stream &stream, boost::array< T, N > &t)
mqtt_client::MqttClient::mqtt2primitive
void mqtt2primitive(mqtt::const_message_ptr mqtt_msg)
Publishes a primitive message received via MQTT to ROS.
Definition: MqttClient.cpp:671
boost::shared_ptr
mqtt_client::MqttClient::on_failure
void on_failure(const mqtt::token &token) override
Callback for when a MQTT action fails.
Definition: MqttClient.cpp:940
XmlRpc::XmlRpcException::getMessage
const std::string & getMessage() const
mqtt_client::MqttClient::ClientConfig::verify
bool verify
Definition: MqttClient.h:368
ros
ros::NodeHandle::getParam
bool getParam(const std::string &key, bool &b) const
mqtt_client::MqttClient::delivery_complete
void delivery_complete(mqtt::delivery_token_ptr token) override
Callback for when delivery for a MQTT message has been completed.
Definition: MqttClient.cpp:931
mqtt_client::MqttClient::is_connected_
bool is_connected_
Status variable keeping track of connection status to broker.
Definition: MqttClient.h:445
mqtt_client::MqttClient::ClientConfig::key
std::filesystem::path key
client private keyfile
Definition: MqttClient.h:365
ros::serialization::serializationLength
uint32_t serializationLength(const boost::array< T, N > &t)
nodelet::Nodelet::getMTPrivateNodeHandle
ros::NodeHandle & getMTPrivateNodeHandle() const
NODELET_WARN
#define NODELET_WARN(...)
MqttClient.h
mqtt_client::MqttClient::ClientConfig::max_inflight
int max_inflight
maximum number of inflight messages
Definition: MqttClient.h:362
ros::NodeHandle::advertiseService
ServiceServer advertiseService(AdvertiseServiceOptions &ops)
mqtt_client::MqttClient::ClientConfig::buffer
struct mqtt_client::MqttClient::ClientConfig::@1 buffer
client buffer-related variables
mqtt_client::MqttClient::loadParameter
bool loadParameter(const std::string &key, std::string &value)
Loads requested ROS parameter from parameter server.
Definition: MqttClient.cpp:339
mqtt_client::MqttClient::client_config_
ClientConfig client_config_
Client parameters.
Definition: MqttClient.h:455
mqtt_client::MqttClient::BrokerConfig::pass
std::string pass
password
Definition: MqttClient.h:336
mqtt_client::MqttClient::ClientConfig::tls
struct mqtt_client::MqttClient::ClientConfig::@3 tls
SSL/TLS-related variables.
ros::serialization::IStream
mqtt_client::MqttClient::client_
std::shared_ptr< mqtt::async_client > client_
MQTT client variable.
Definition: MqttClient.h:460
mqtt_client::MqttClient::ClientConfig::message
std::string message
last-will message
Definition: MqttClient.h:356
mqtt_client::MqttClient::ClientConfig::version
int version
TLS version (https://github.com/eclipse/paho.mqtt.cpp/blob/master/src/mqtt/ssl_options....
Definition: MqttClient.h:367
mqtt_client::MqttClient::Mqtt2RosInterface
Struct containing variables related to a MQTT2ROS connection.
Definition: MqttClient.h:394
mqtt_client::MqttClient::ClientConfig::topic
std::string topic
last-will topic
Definition: MqttClient.h:355
ros::NodeHandle::advertise
Publisher advertise(AdvertiseOptions &ops)
mqtt_client::MqttClient::kRosMsgTypeMqttTopicPrefix
static const std::string kRosMsgTypeMqttTopicPrefix
MQTT topic prefix under which ROS message type information is published.
Definition: MqttClient.h:418
mqtt_client::MqttClient::BrokerConfig::user
std::string user
username
Definition: MqttClient.h:335
mqtt_client::MqttClient::ClientConfig::clean_session
bool clean_session
whether client requests clean session
Definition: MqttClient.h:360
mqtt_client::MqttClient::BrokerConfig::port
int port
broker port
Definition: MqttClient.h:334
class_list_macros.h
mqtt_client::MqttClient::ros2mqtt
void ros2mqtt(const topic_tools::ShapeShifter::ConstPtr &ros_msg, const std::string &ros_topic)
Serializes and publishes a generic ROS message to the MQTT broker.
Definition: MqttClient.cpp:490
mqtt_client::MqttClient::ClientConfig::last_will
struct mqtt_client::MqttClient::ClientConfig::@2 last_will
last-will-related variables
PLUGINLIB_EXPORT_CLASS
#define PLUGINLIB_EXPORT_CLASS(class_type, base_class_type)
mqtt_client::MqttClient::ClientConfig::directory
std::filesystem::path directory
client buffer directory
Definition: MqttClient.h:352
message_traits.h
mqtt_client::MqttClient::connect_options_
mqtt::connect_options connect_options_
MQTT client connection options.
Definition: MqttClient.h:465
ros::message_traits::DataType::value
static const char * value()
TimeBase< Time, Duration >::isZero
bool isZero() const
mqtt_client::MqttClient::Ros2MqttInterface
Struct containing variables related to a ROS2MQTT connection.
Definition: MqttClient.h:377
ros::WallTime::now
static WallTime now()
ros::message_traits::MD5Sum::value
static const char * value()
mqtt_client::MqttClient
ROS Nodelet for sending and receiving ROS messages via MQTT.
Definition: MqttClient.h:58
mqtt_client::MqttClient::private_node_handle_
ros::NodeHandle private_node_handle_
Private ROS node handle.
Definition: MqttClient.h:435
XmlRpc::XmlRpcException::getCode
int getCode() const
mqtt_client::MqttClient::ClientConfig::alpn_protos
std::vector< std::string > alpn_protos
list of ALPN protocols
Definition: MqttClient.h:370
ros::NodeHandle::hasParam
bool hasParam(const std::string &key) const
mqtt_client::MqttClient::message_arrived
void message_arrived(mqtt::const_message_ptr mqtt_msg) override
Callback for when the client receives a MQTT message from the broker.
Definition: MqttClient.cpp:846
mqtt_client::MqttClient::connected
void connected(const std::string &cause) override
Callback for when the client has successfully connected to the broker.
Definition: MqttClient.cpp:801
mqtt_client::MqttClient::isConnectedService
bool isConnectedService(mqtt_client_interfaces::IsConnected::Request &request, mqtt_client_interfaces::IsConnected::Response &response)
ROS service returning whether the client is connected to the broker.
Definition: MqttClient.cpp:837
XmlRpcValue.h
ros::message_traits::MD5Sum
ros::NodeHandle::subscribe
Subscriber subscribe(const std::string &topic, uint32_t queue_size, const boost::function< void(C)> &callback, const VoidConstPtr &tracked_object=VoidConstPtr(), const TransportHints &transport_hints=TransportHints())
mqtt_client::MqttClient::broker_config_
BrokerConfig broker_config_
Broker parameters.
Definition: MqttClient.h:450
ros::WallTime
mqtt_client::MqttClient::kLatencyRosTopicPrefix
static const std::string kLatencyRosTopicPrefix
ROS topic prefix under which ROS2MQTT2ROS latencies are published.
Definition: MqttClient.h:425
mqtt_client::MqttClient::mqtt2ros_
std::map< std::string, Mqtt2RosInterface > mqtt2ros_
MQTT2ROS connection variables sorted by MQTT topic.
Definition: MqttClient.h:475
TimeBase< WallTime, WallDuration >::sec
uint32_t sec
mqtt_client::MqttClient::ClientConfig::certificate
std::filesystem::path certificate
client certificate
Definition: MqttClient.h:364
mqtt_client::MqttClient::ClientConfig::enabled
bool enabled
whether client buffer is enabled
Definition: MqttClient.h:350
TimeBase< WallTime, WallDuration >::nsec
uint32_t nsec
mqtt_client::MqttClient::ros2mqtt_
std::map< std::string, Ros2MqttInterface > ros2mqtt_
ROS2MQTT connection variables sorted by ROS topic.
Definition: MqttClient.h:470
XmlRpc::XmlRpcException
mqtt_client::MqttClient::setup
void setup()
Initializes broker connection and subscriptions.
Definition: MqttClient.cpp:384
mqtt_client::MqttClient::connection_lost
void connection_lost(const std::string &cause) override
Callback for when the client has lost connection to the broker.
Definition: MqttClient.cpp:823
mqtt_client::MqttClient::is_connected_service_
ros::ServiceServer is_connected_service_
ROS Service server for providing connection status.
Definition: MqttClient.h:440
NODELET_INFO
#define NODELET_INFO(...)
XmlRpc::XmlRpcValue::hasMember
bool hasMember(const std::string &name) const
ros::serialization::StreamOverrunException
mqtt_client::MqttClient::BrokerConfig::ca_certificate
std::filesystem::path ca_certificate
public CA certificate trusted by client
Definition: MqttClient.h:340
nodelet::Nodelet
ros::Time
ros::get_environment_variable
bool get_environment_variable(std::string &str, const char *environment_variable)
mqtt_client::MqttClient::ClientConfig::keep_alive_interval
double keep_alive_interval
keep-alive interval
Definition: MqttClient.h:361
ROS_ERROR
#define ROS_ERROR(...)
mqtt_client::MqttClient::BrokerConfig::host
std::string host
broker host
Definition: MqttClient.h:333
mqtt_client::MqttClient::connect
void connect()
Connects to the broker using the member client and options.
Definition: MqttClient.cpp:472
default_value
def default_value(type_)
mqtt_client::serializeRosMessage
void serializeRosMessage(const T &msg, std::vector< uint8_t > &buffer)
Definition: MqttClient.h:541
ros::NodeHandle::param
T param(const std::string &param_name, const T &default_val) const
mqtt_client::MqttClient::ClientConfig::id
std::string id
client unique ID
Definition: MqttClient.h:348
ros::message_traits::Definition::value
static const char * value()
XmlRpcException.h
mqtt_client::primitiveRosMessageToString
bool primitiveRosMessageToString(const topic_tools::ShapeShifter::ConstPtr &msg, std::string &primitive)
Extracts string of primitive data types from ROS message.
Definition: MqttClient.cpp:95
DurationBase< Duration >::toSec
double toSec() const
mqtt_client::MqttClient::BrokerConfig::enabled
bool enabled
whether to connect via SSL/TLS
Definition: MqttClient.h:338
mqtt_client::MqttClient::BrokerConfig::tls
struct mqtt_client::MqttClient::BrokerConfig::@0 tls
SSL/TLS-related variables.
topic_tools::ShapeShifter
mqtt_client::MqttClient::isConnected
bool isConnected()
Returns whether the client is connected to the broker.
Definition: MqttClient.cpp:831
mqtt_client
Namespace for the mqtt_client package.
Definition: MqttClient.h:46
ros::Duration
ros::serialization::serialize
void serialize(Stream &stream, const boost::array< T, N > &t)
mqtt_client::MqttClient::ClientConfig::retained
bool retained
whether last-will is retained
Definition: MqttClient.h:358
mqtt_client::MqttClient::onInit
virtual void onInit() override
Initializes nodelet when nodelet is loaded.
Definition: MqttClient.cpp:147
mqtt_client::MqttClient::loadParameters
void loadParameters()
Loads ROS parameters from parameter server.
Definition: MqttClient.cpp:158
mqtt_client::MqttClient::node_handle_
ros::NodeHandle node_handle_
ROS node handle.
Definition: MqttClient.h:430
XmlRpc::XmlRpcValue
nodelet::Nodelet::getMTNodeHandle
ros::NodeHandle & getMTNodeHandle() const
NODELET_DEBUG
#define NODELET_DEBUG(...)


mqtt_client
Author(s): Lennart Reiher , Bastian Lampe , Christian Wende
autogenerated on Thu Oct 5 2023 02:09:10