mqtt_client.cpp
Go to the documentation of this file.
1 #include "mqtt_client.h"
2 #include <QDebug>
3 #include <QMessageBox>
4 #include <QString>
5 #include <QtGlobal>
6 #include <QLoggingCategory>
7 
8 #ifdef WIN32
9 #include <windows.h>
10 #include <strsafe.h>
11 #endif
12 
13 #define MQTT_DEBUG 0
14 #define debug() qCDebug(category)
15 
16 static const QLoggingCategory category("MQTTClient");
17 
18 void connect_callback(struct mosquitto* mosq, void* context, int result, int,
19  const mosquitto_property*)
20 {
21  MQTTClient* self = static_cast<MQTTClient*>(context);
22 
23  if (!result)
24  {
25  for (const auto& topic : self->config().topics)
26  {
27  mosquitto_subscribe(mosq, nullptr, topic.c_str(), self->config().qos);
28  }
29  }
30  else
31  {
32  QMessageBox::warning(
33  nullptr, "MQTT Client",
34  QString("Connection error: %1").arg(mosquitto_reason_string(result)),
35  QMessageBox::Ok);
36  }
37  self->_connected = true;
38 }
39 
40 void disconnect_callback(struct mosquitto* mosq, void* context, int result)
41 {
42  MQTTClient* self = static_cast<MQTTClient*>(context);
43 
44  if (self->isConnected() && result == MOSQ_ERR_CONN_LOST)
45  {
46  emit self->disconnected();
47  }
48 }
49 
50 void message_callback(struct mosquitto* mosq, void* context,
51  const struct mosquitto_message* message, const mosquitto_property*)
52 {
53  MQTTClient* self = static_cast<MQTTClient*>(context);
54  self->onMessageReceived(message);
55 }
56 
57 void log_callback(struct mosquitto* mosq, void* context, int log_level, const char* msg)
58 {
59  const std::pair<int, const char*> log_level_map[] = {
60  { MOSQ_LOG_INFO, "MOSQ_LOG_INFO" },
61  { MOSQ_LOG_NOTICE, "MOSQ_LOG_NOTICE" },
62  { MOSQ_LOG_WARNING, "MOSQ_LOG_WARNING" },
63  { MOSQ_LOG_ERR, "MOSQ_LOG_ERR " },
64  { MOSQ_LOG_DEBUG, "MOSQ_LOG_DEBUG" },
65  { MOSQ_LOG_SUBSCRIBE, "MOSQ_LOG_SUBSCRIBE" },
66  { MOSQ_LOG_UNSUBSCRIBE, "MOSQ_LOG_UNSUBSCRIBE" },
67  { MOSQ_LOG_WEBSOCKETS, "MOSQ_LOG_WEBSOCKETS" },
68  };
69 
70  const auto it =
71  std::find_if(std::begin(log_level_map), std::end(log_level_map),
72  [log_level](const auto& pair) { return log_level == pair.first; });
73  if (it == std::end(log_level_map))
74  return;
75 
76  debug() << it->second << ": " << msg;
77 }
78 
79 //----------------------------
80 
82 {
83  mosquitto_lib_init();
84 
85 #if MQTT_DEBUG
86  int major = 0, minor = 0, revision = 0;
87  mosquitto_lib_version(&major, &minor, &revision);
88  debug() << "mosquitto version: " << major << "." << minor << "." << revision;
89 #endif // MQTT_DEBUG
90 }
91 
93 {
94  if (_connected)
95  {
96  disconnect();
97  }
98  mosquitto_lib_cleanup();
99 }
100 
102 {
103  if (_connected)
104  {
105  disconnect();
106  }
107 
108  // Start with a fresh mosquitto instance.
109  Q_ASSERT(_mosq == nullptr);
110  _mosq = mosquitto_new(nullptr, true, this);
111 
112  bool success = configureMosquitto(config);
113  if (!success)
114  {
115  mosquitto_destroy(_mosq);
116  _mosq = nullptr;
117  return false;
118  }
119 
120  _connected = true;
121  _config = config;
122  return true;
123 }
124 
126 {
127  mosquitto_connect_v5_callback_set(_mosq, connect_callback);
128  mosquitto_disconnect_callback_set(_mosq, disconnect_callback);
129  mosquitto_message_v5_callback_set(_mosq, message_callback);
130 #if MQTT_DEBUG
131  mosquitto_log_callback_set(_mosq, log_callback);
132 #endif
133 
134  int rc =
135  mosquitto_int_option(_mosq, MOSQ_OPT_PROTOCOL_VERSION, config.protocol_version);
136  if (rc != MOSQ_ERR_SUCCESS)
137  {
138  QMessageBox::warning(nullptr, "MQTT Client", QString("MQTT initialization failed."),
139  QMessageBox::Ok);
140  debug() << "MQTT initialization failed:" << mosquitto_strerror(rc);
141  return false;
142  }
143 
144  if ((!config.username.empty() || !config.password.empty()))
145  {
146  rc = mosquitto_username_pw_set(_mosq, config.username.c_str(),
147  config.password.c_str());
148  if (rc != MOSQ_ERR_SUCCESS)
149  {
150  QMessageBox::warning(nullptr, "MQTT Client",
151  QString("MQTT initialization failed. Double check username "
152  "and password."),
153  QMessageBox::Ok);
154  debug() << "MQTT username or password error:" << mosquitto_strerror(rc);
155  return false;
156  }
157  }
158 
159  if (config.cafile.empty() == false)
160  {
161  const char* cafile = config.cafile.c_str();
162  const char* certfile = config.certfile.empty() ? nullptr : config.certfile.c_str();
163  const char* keyfile = config.keyfile.empty() ? nullptr : config.keyfile.c_str();
164  rc = mosquitto_tls_set(_mosq, cafile, nullptr, certfile, keyfile, nullptr);
165  if (rc != MOSQ_ERR_SUCCESS)
166  {
167  QMessageBox::warning(nullptr, "MQTT Client",
168  QString("MQTT initialization failed. Double check "
169  "certificates."),
170  QMessageBox::Ok);
171  debug() << "MQTT certificate error:" << mosquitto_strerror(rc);
172  return false;
173  }
174  }
175 
176  rc = mosquitto_max_inflight_messages_set(_mosq, config.max_inflight);
177  if (rc != MOSQ_ERR_SUCCESS)
178  {
179  QMessageBox::warning(nullptr, "MQTT Client", QString("MQTT initialization failed."),
180  QMessageBox::Ok);
181  debug() << "MQTT setting max inflight messages failed:" << mosquitto_strerror(rc);
182  return false;
183  }
184 
185  const mosquitto_property* properties = nullptr; // todo
186 
187  rc = mosquitto_connect_bind_v5(_mosq, config.host.c_str(), config.port,
188  config.keepalive, nullptr, properties);
189  // TODO bind
190  if (rc > 0)
191  {
192  if (rc == MOSQ_ERR_ERRNO)
193  {
194  char err[1024];
195 #ifndef WIN32
196  strerror_r(errno, err, 1024);
197 #else
198  FormatMessage(FORMAT_MESSAGE_FROM_SYSTEM, NULL, errno, 0, (LPTSTR)&err, 1024, NULL);
199 #endif
200  QMessageBox::warning(nullptr, "MQTT Client", QString("Error: %1").arg(err),
201  QMessageBox::Ok);
202  }
203  else
204  {
205  QMessageBox::warning(nullptr, "MQTT Client",
206  QString("Unable to connect (%1)").arg(mosquitto_strerror(rc)),
207  QMessageBox::Ok);
208  }
209  debug() << "MQTT connect failed:" << mosquitto_strerror(rc);
210  return false;
211  }
212 
213  rc = mosquitto_loop_start(_mosq);
214  if (rc == MOSQ_ERR_NOT_SUPPORTED)
215  {
216  // Threaded mode may not be supported on windows (libmosquitto < 2.1).
217  // See https://github.com/eclipse/mosquitto/issues/2707
218  _thread = std::thread([this]() {
219  int rc = mosquitto_loop_forever(this->_mosq, -1, 1);
220  if (rc != MOSQ_ERR_SUCCESS)
221  {
222  debug() << "MQTT loop forever failed:" << mosquitto_strerror(rc);
223  }
224  });
225  }
226  else if (rc != MOSQ_ERR_SUCCESS)
227  {
228  QMessageBox::warning(nullptr, "MQTT Client", QString("Failed to start MQTT client"),
229  QMessageBox::Ok);
230  debug() << "MQTT start loop failed:" << mosquitto_strerror(rc);
231  return false;
232  }
233  return true;
234 }
235 
237 {
238  if (_connected)
239  {
240  mosquitto_disconnect(_mosq);
241  mosquitto_loop_stop(_mosq, true);
242  if (_thread.joinable())
243  {
244  _thread.join();
245  }
246  mosquitto_destroy(_mosq);
247  _mosq = nullptr;
248  }
249  _connected = false;
250  _topics_set.clear();
251  _message_callbacks.clear();
252 }
253 
255 {
256  return _connected;
257 }
258 
259 void MQTTClient::addMessageCallback(const std::string& topic,
260  MQTTClient::TopicCallback callback)
261 {
262  std::unique_lock<std::mutex> lk(_mutex);
263  _message_callbacks[topic] = callback;
264 }
265 
266 void MQTTClient::onMessageReceived(const mosquitto_message* message)
267 {
268  std::unique_lock<std::mutex> lk(_mutex);
269 
270  _topics_set.insert(message->topic);
271 
272  auto it = _message_callbacks.find(message->topic);
273  if (it != _message_callbacks.end())
274  {
275  it->second(message);
276  }
277 }
278 
280 {
281  return _config;
282 }
283 
284 std::unordered_set<std::string> MQTTClient::getTopicList()
285 {
286  std::unique_lock<std::mutex> lk(_mutex);
287  return _topics_set;
288 }
289 
290 void MQTTClient::subscribe(const std::string& topic, int qos)
291 {
292  if (_connected)
293  {
294  mosquitto_subscribe(_mosq, nullptr, topic.c_str(), qos);
295  }
296 }
297 
298 void MQTTClient::unsubscribe(const std::string& topic)
299 {
300  if (_connected)
301  {
302  mosquitto_unsubscribe(_mosq, nullptr, topic.c_str());
303  }
304 }
MQTTClient::subscribe
void subscribe(const std::string &topic, int qos)
Definition: mqtt_client.cpp:290
MQTTClient::connect
bool connect(const MosquittoConfig &config)
Definition: mqtt_client.cpp:101
MQTTClient::configureMosquitto
bool configureMosquitto(const MosquittoConfig &config)
Definition: mqtt_client.cpp:125
MQTTClient::onMessageReceived
void onMessageReceived(const mosquitto_message *message)
Definition: mqtt_client.cpp:266
MosquittoConfig::host
std::string host
Definition: mosquitto_config.h:12
MQTTClient::disconnect
void disconnect()
Definition: mqtt_client.cpp:236
MQTTClient::MQTTClient
MQTTClient()
Definition: mqtt_client.cpp:81
MosquittoConfig::password
std::string password
Definition: mosquitto_config.h:22
MosquittoConfig::port
int port
Definition: mosquitto_config.h:13
MQTTClient::TopicCallback
std::function< void(const mosquitto_message *)> TopicCallback
Definition: mqtt_client.h:30
arg
auto arg(const Char *name, const T &arg) -> detail::named_arg< Char, T >
Definition: core.h:1875
MosquittoConfig::username
std::string username
Definition: mosquitto_config.h:21
MQTTClient::_thread
std::thread _thread
Definition: mqtt_client.h:57
MQTTClient
Definition: mqtt_client.h:14
mqtt_test_proto.msg
msg
Definition: mqtt_test_proto.py:43
message_callback
void message_callback(struct mosquitto *mosq, void *context, const struct mosquitto_message *message, const mosquitto_property *)
Definition: mqtt_client.cpp:50
MQTTClient::config
const MosquittoConfig & config() const
Definition: mqtt_client.cpp:279
MQTTClient::_connected
bool _connected
Definition: mqtt_client.h:33
MQTTClient::_topics_set
std::unordered_set< std::string > _topics_set
Definition: mqtt_client.h:54
MosquittoConfig::keyfile
std::string keyfile
Definition: mosquitto_config.h:27
category
static const QLoggingCategory category("MQTTClient")
MQTTClient::_config
MosquittoConfig _config
Definition: mqtt_client.h:56
start_test_publisher.topic
topic
Definition: start_test_publisher.py:22
MosquittoConfig::max_inflight
unsigned int max_inflight
Definition: mosquitto_config.h:20
disconnect_callback
void disconnect_callback(struct mosquitto *mosq, void *context, int result)
Definition: mqtt_client.cpp:40
MQTTClient::unsubscribe
void unsubscribe(const std::string &topic)
Definition: mqtt_client.cpp:298
MosquittoConfig
Definition: mosquitto_config.h:8
MQTTClient::disconnected
void disconnected()
MosquittoConfig::cafile
std::string cafile
Definition: mosquitto_config.h:24
MQTTClient::_message_callbacks
std::unordered_map< std::string, TopicCallback > _message_callbacks
Definition: mqtt_client.h:53
MQTTClient::~MQTTClient
~MQTTClient()
Definition: mqtt_client.cpp:92
connect_callback
void connect_callback(struct mosquitto *mosq, void *context, int result, int, const mosquitto_property *)
Definition: mqtt_client.cpp:18
MQTTClient::_mosq
mosquitto * _mosq
Definition: mqtt_client.h:52
debug
#define debug()
Definition: mqtt_client.cpp:14
MosquittoConfig::keepalive
int keepalive
Definition: mosquitto_config.h:11
MQTTClient::isConnected
bool isConnected() const
Definition: mqtt_client.cpp:254
MQTTClient::addMessageCallback
void addMessageCallback(const std::string &topic, TopicCallback callback)
Definition: mqtt_client.cpp:259
presentation_type::debug
@ debug
MQTTClient::_mutex
std::mutex _mutex
Definition: mqtt_client.h:55
MosquittoConfig::certfile
std::string certfile
Definition: mosquitto_config.h:26
log_callback
void log_callback(struct mosquitto *mosq, void *context, int log_level, const char *msg)
Definition: mqtt_client.cpp:57
MQTTClient::getTopicList
std::unordered_set< std::string > getTopicList()
Definition: mqtt_client.cpp:284
nullptr
#define nullptr
Definition: backward.hpp:386
MosquittoConfig::protocol_version
int protocol_version
Definition: mosquitto_config.h:10
mqtt_client.h


plotjuggler
Author(s): Davide Faconti
autogenerated on Mon Nov 11 2024 03:23:45