datastream_mqtt.cpp
Go to the documentation of this file.
1 #include "datastream_mqtt.h"
2 #include "ui_datastream_mqtt.h"
3 #include <QMessageBox>
4 #include <QSettings>
5 #include <QDebug>
6 #include <QUuid>
7 
8 class MQTT_Dialog: public QDialog
9 {
10 public:
12  QDialog(nullptr),
13  ui(new Ui::DataStreamMQTT)
14  {
15  ui->setupUi(this);
16 
17  static QString uuid = QString::number(qrand());
18  ui->lineEditClientID->setText(tr("Plotjuggler-") + uuid);
19  }
20 
22  while( ui->layoutOptions->count() > 0)
23  {
24  auto item = ui->layoutOptions->takeAt(0);
25  item->widget()->setParent(nullptr);
26  }
27  delete ui;
28  }
29 
30  Ui::DataStreamMQTT* ui;
31 };
32 
33 //---------------------------------------------
34 
35 QString Code(int rc)
36 {
37  switch(rc)
38  {
39  case MQTTASYNC_FAILURE: return"FAILURE";
40  case MQTTASYNC_PERSISTENCE_ERROR: return"PERSISTENCE_ERROR";
41  case MQTTASYNC_DISCONNECTED: return"DISCONNECTED";
42  case MQTTASYNC_MAX_MESSAGES_INFLIGHT: return"MAX_MESSAGES_INFLIGHT";
43  case MQTTASYNC_BAD_UTF8_STRING: return"BAD_UTF8_STRING";
44  case MQTTASYNC_NULL_PARAMETER: return"NULL_PARAMETER";
45  case MQTTASYNC_TOPICNAME_TRUNCATED: return"TOPICNAME_TRUNCATED";
46  case MQTTASYNC_BAD_STRUCTURE: return"BAD_STRUCTURE";
47  case MQTTASYNC_BAD_QOS: return"BAD_QOS";
48  case MQTTASYNC_NO_MORE_MSGIDS: return"NO_MORE_MSGIDS";
49  case MQTTASYNC_OPERATION_INCOMPLETE: return"OPERATION_INCOMPLETE";
50  case MQTTASYNC_MAX_BUFFERED_MESSAGES: return"MAX_BUFFERED_MESSAGES";
51  case MQTTASYNC_SSL_NOT_SUPPORTED: return"SSL_NOT_SUPPORTED";
52  case MQTTASYNC_BAD_PROTOCOL: return"BAD_PROTOCOL";
53  case MQTTASYNC_BAD_MQTT_OPTION: return"BAD_MQTT_OPTION";
54  case MQTTASYNC_WRONG_MQTT_VERSION: return"WRONG_MQTT_VERSION";
55  case MQTTASYNC_0_LEN_WILL_TOPIC: return"0_LEN_WILL_TOPIC";
56  }
57  return QString::number(rc);
58 }
59 
60 void ConnectionLost(void *context, char *cause)
61 {
62  DataStreamMQTT* _this = static_cast<DataStreamMQTT*>(context);
63 
65 
66  qDebug() <<"MQTT Connection lost. Reconnecting...";
67 
68  conn_opts.keepAliveInterval = 20;
69  conn_opts.cleansession = 1;
70 
71  int rc = MQTTAsync_connect(_this->_client, &conn_opts);
72 
73  if (rc != MQTTASYNC_SUCCESS)
74  {
75  _this->_error_msg = QString("Failed to start connect, return code %1").arg(Code(rc));
76  _this->_finished = true;
77  }
78 }
79 
80 int MessageArrived(void *context, char *topicName,
81  int topicLen, MQTTAsync_message *message)
82 {
83  DataStreamMQTT* _this = static_cast<DataStreamMQTT*>(context);
84 
85  auto it = _this->_parsers.find(topicName);
86  if( it == _this->_parsers.end() )
87  {
88  auto parser = _this->availableParsers()->at( _this->_protocol )->createInstance({}, _this->dataMap());
89  it = _this->_parsers.insert( {topicName, parser} ).first;
90  }
91  auto& parser = it->second;
92 
93  try {
94  MessageRef msg( static_cast<uint8_t*>(message->payload), message->payloadlen);
95 
96  using namespace std::chrono;
97  auto ts = high_resolution_clock::now().time_since_epoch();
98  double timestamp = 1e-6* double( duration_cast<microseconds>(ts).count() );
99 
100  parser->parseMessage(msg, timestamp);
101 
102  } catch (std::exception& ) {
103  _this->_protocol_issue = true;
104  return 0;
105  }
106 
107 // printf(" topic: %s\n", topicName);
108 // printf(" message: %.*s\n", message->payloadlen, (char*)message->payload);
109  MQTTAsync_freeMessage(&message);
110  MQTTAsync_free(topicName);
111 
112  emit _this->dataReceived();
113  return 1;
114 }
115 
117 {
118  DataStreamMQTT* _this = static_cast<DataStreamMQTT*>(context);
119  _this->_disconnection_done = true;
120 }
121 
123 {
124  DataStreamMQTT* _this = static_cast<DataStreamMQTT*>(context);
125  _this->_disconnection_done = true;
126 }
127 
129 {
130  DataStreamMQTT* _this = static_cast<DataStreamMQTT*>(context);
131  _this->_subscribed = true;
132 }
133 
135 {
136  DataStreamMQTT* _this = static_cast<DataStreamMQTT*>(context);
137  _this->_error_msg = QString("Subscription Failure. Code %1").arg(Code(response->code));
138  _this->_finished = true;
139 }
140 
141 
143 {
144  DataStreamMQTT* _this = static_cast<DataStreamMQTT*>(context);
145  _this->_error_msg = QString("Connection Failure. Code %1").arg(Code(response->code));
146  _this->_finished = true;
147 }
148 
149 
151 {
152  DataStreamMQTT* _this = static_cast<DataStreamMQTT*>(context);
153  MQTTAsync client = _this->_client;
154 
156 
157  opts.onSuccess = onSubscribe;
159  opts.context = _this;
160 
161  int rc = MQTTAsync_subscribe(
162  client,
163  _this->_topic_filter.toStdString().c_str(),
164  _this->_qos,
165  &opts);
166 
167  if ( rc != MQTTASYNC_SUCCESS)
168  {
169  _this->_error_msg = QString("Failed to start subscribe, return code %1").arg(Code(rc));
170  _this->_finished = true;
171  }
172 }
173 
174 
176  _protocol_issue(false),
177  _running(false)
178 {
179  _protocol_issue_timer.setSingleShot(false);
180  _protocol_issue_timer.setInterval(500);
181 
182  connect(&_protocol_issue_timer, &QTimer::timeout, this,
183  [this](){
184  if(_protocol_issue){
185  _protocol_issue = false;
186  shutdown();
187 
188  QMessageBox::warning(nullptr,tr("DataStream MQTT"),
189  tr("Exception while parsing the message. Probably the format was not recognized (%1 used)").arg( this->_protocol),
190  QMessageBox::Ok);
191  emit this->closed();
192 
193  }
194  });
195 }
196 
197 bool DataStreamMQTT::start(QStringList *)
198 {
199  if (_running)
200  {
201  return _running;
202  }
203 
204  if( !availableParsers() )
205  {
206  QMessageBox::warning(nullptr,tr("Websocket Server"), tr("No available MessageParsers"), QMessageBox::Ok);
207  _running = false;
208  return false;
209  }
210 
211  MQTT_Dialog* dialog = new MQTT_Dialog();
212 
213  for( const auto& it: *availableParsers())
214  {
215  dialog->ui->comboBoxProtocol->addItem( it.first );
216 
217  if(auto widget = it.second->optionsWidget() )
218  {
219  widget->setVisible(false);
220  dialog->ui->layoutOptions->addWidget( widget );
221  }
222  }
223 
224  std::shared_ptr<MessageParserCreator> parser_creator;
225 
226  connect(dialog->ui->comboBoxProtocol, qOverload<const QString &>( &QComboBox::currentIndexChanged), this,
227  [&](QString protocol)
228  {
229  if( parser_creator ){
230  QWidget* prev_widget = parser_creator->optionsWidget();
231  prev_widget->setVisible(false);
232  }
233  parser_creator = availableParsers()->at( protocol );
234 
235  if(auto widget = parser_creator->optionsWidget() ){
236  widget->setVisible(true);
237  }
238  });
239 
240  // load previous values
241  QSettings settings;
242  QString address = settings.value("DataStreamMQTT::address").toString();
243  _protocol = settings.value("DataStreamMQTT::protocol", "JSON").toString();
244  _topic_filter = settings.value("DataStreamMQTT::filter").toString();
245  _qos = settings.value("DataStreamMQTT::qos", 0).toInt();
246  QString username = settings.value("DataStreamMQTT::username", "").toString();
247  QString password = settings.value("DataStreamMQTT::password", "").toString();
248 
249  dialog->ui->lineEditAddress->setText( address );
250  dialog->ui->comboBoxProtocol->setCurrentText(_protocol);
251  dialog->ui->lineEditTopicFilter->setText( _topic_filter );
252  dialog->ui->comboBoxQoS->setCurrentIndex(_qos);
253  dialog->ui->lineEditUsername->setText(username);
254  dialog->ui->lineEditPassword->setText(password);
255 
256  if( dialog->exec() == QDialog::Rejected )
257  {
258  return false;
259  }
260 
261  address = dialog->ui->lineEditAddress->text();
262  _protocol = dialog->ui->comboBoxProtocol->currentText();
263  _topic_filter = dialog->ui->lineEditTopicFilter->text();
264  _qos = dialog->ui->comboBoxQoS->currentIndex();
265  QString cliend_id = dialog->ui->lineEditClientID->text();
266  username = dialog->ui->lineEditUsername->text();
267  password = dialog->ui->lineEditPassword->text();
268 
269  dialog->deleteLater();
270 
271  // save back to service
272  settings.setValue("DataStreamMQTT::address", address);
273  settings.setValue("DataStreamMQTT::filter", _topic_filter);
274  settings.setValue("DataStreamMQTT::protocol", _protocol);
275  settings.setValue("DataStreamMQTT::qos", _qos);
276  settings.setValue("DataStreamMQTT::username", username);
277  settings.setValue("DataStreamMQTT::password", password);
278 
279  _subscribed = false;
280  _finished = false;
281  _protocol_issue = false;
282 
284 
285  int rc = MQTTAsync_create(&_client,
286  address.toStdString().c_str(),
287  cliend_id.toStdString().c_str(),
289  nullptr);
290 
291  if (rc != MQTTASYNC_SUCCESS)
292  {
293  QMessageBox::warning(nullptr,tr("DataStream MQTT"),
294  tr("Failed create client MQTT. Error code %1").arg(Code(rc)),
295  QMessageBox::Ok);
296  return false;
297  }
298 
300  if (rc != MQTTASYNC_SUCCESS)
301  {
302  QMessageBox::warning(nullptr,tr("DataStream MQTT"),
303  tr("Failed to set callbacks. Error code %1").arg(Code(rc)),
304  QMessageBox::Ok);
306  return false;
307  }
308 
309  conn_opts.keepAliveInterval = 20;
310  conn_opts.cleansession = 1;
311  conn_opts.onSuccess = onConnect;
312  conn_opts.onFailure = onConnectFailure;
313  conn_opts.context = this;
314  conn_opts.username = username.toStdString().c_str();
315  conn_opts.password = password.toStdString().c_str();
316 
317  rc = MQTTAsync_connect(_client, &conn_opts);
318 
319  if (rc != MQTTASYNC_SUCCESS)
320  {
321  QMessageBox::warning(nullptr,tr("DataStream MQTT"),
322  tr("Failed to start connection. Error code %1").arg(Code(rc)),
323  QMessageBox::Ok);
325  return false;
326  }
327 
328  while (!_subscribed && !_finished)
329  {
330  std::this_thread::sleep_for(std::chrono::milliseconds(10));
331  }
332 
333  if (_finished)
334  {
335  QMessageBox::warning(nullptr,tr("DataStream MQTT"),
336  tr("Failed to start connection. Message: %1").arg(_error_msg),
337  QMessageBox::Ok);
339  return false;
340  }
341 
342  _running = true;
343  _protocol_issue_timer.start(500);
344  return _running;
345 }
346 
348 {
349  if( _running )
350  {
351  _disconnection_done = false;
353 
354  disc_opts.context = this;
355  disc_opts.onSuccess = onDisconnect;
356  disc_opts.onFailure = onDisconnectFailure;
357 
358  int rc = MQTTAsync_disconnect(_client, &disc_opts);
359  if (rc != MQTTASYNC_SUCCESS)
360  {
361  QMessageBox::warning(nullptr,tr("DataStream MQTT"),
362  tr("Failed to disconned cleanly. Error code %1").arg(rc),
363  QMessageBox::Ok);
365  _running = false;
366  return;
367  }
368 
369  while (!_disconnection_done)
370  {
371  std::this_thread::sleep_for(std::chrono::milliseconds(10));
372  }
373 
375  _running = false;
376  _parsers.clear();
377  }
378 }
379 
381 {
382  return _running;
383 }
384 
386 {
387  shutdown();
388 }
389 
390 
MQTTAsync_onFailure * onFailure
Definition: MQTTAsync.h:1255
virtual ~DataStreamMQTT() override
#define MQTTASYNC_NULL_PARAMETER
Definition: MQTTAsync.h:140
#define MQTTASYNC_SSL_NOT_SUPPORTED
Definition: MQTTAsync.h:171
#define nullptr
Definition: backward.hpp:386
#define MQTTAsync_responseOptions_initializer
Definition: MQTTAsync.h:746
#define MQTTASYNC_FAILURE
Definition: MQTTAsync.h:118
int MQTTAsync_disconnect(MQTTAsync handle, const MQTTAsync_disconnectOptions *options)
Definition: MQTTAsync.c:3923
int MQTTAsync_setCallbacks(MQTTAsync handle, void *context, MQTTAsync_connectionLost *cl, MQTTAsync_messageArrived *ma, MQTTAsync_deliveryComplete *dc)
Definition: MQTTAsync.c:3062
#define MQTTASYNC_BAD_UTF8_STRING
Definition: MQTTAsync.h:136
virtual void shutdown() override
QString Code(int rc)
struct pubsub_opts opts
Definition: paho_c_pub.c:42
std::unordered_map< std::string, PJ::MessageParserPtr > _parsers
void * MQTTAsync
Definition: MQTTAsync.h:239
void MQTTAsync_free(void *memory)
Definition: MQTTAsync.c:2626
void MQTTAsync_freeMessage(MQTTAsync_message **message)
Definition: MQTTAsync.c:2615
#define MQTTASYNC_DISCONNECTED
Definition: MQTTAsync.h:127
void onConnect(void *context, MQTTAsync_successData *response)
int MQTTAsync_connect(MQTTAsync handle, const MQTTAsync_connectOptions *options)
Definition: MQTTAsync.c:3480
MQTTAsync_connectOptions conn_opts
Definition: paho_c_sub.c:191
int MQTTAsync_subscribe(MQTTAsync handle, const char *topic, int qos, MQTTAsync_responseOptions *response)
Definition: MQTTAsync.c:4121
MessageParserFactory * availableParsers()
#define MQTTASYNC_OPERATION_INCOMPLETE
Definition: MQTTAsync.h:163
virtual bool isRunning() const override
constexpr size_t count()
Definition: core.h:960
#define MQTTASYNC_BAD_MQTT_OPTION
Definition: MQTTAsync.h:181
#define MQTTASYNC_BAD_QOS
Definition: MQTTAsync.h:155
int MQTTAsync_create(MQTTAsync *handle, const char *serverURI, const char *clientId, int persistence_type, void *persistence_context)
Definition: MQTTAsync.c:737
QTimer _protocol_issue_timer
char * username
Definition: test6.c:58
#define MQTTAsync_disconnectOptions_initializer
Definition: MQTTAsync.h:1422
void onDisconnectFailure(void *context, MQTTAsync_failureData *response)
MQTTAsync_onFailure * onFailure
Definition: MQTTAsync.h:702
MQTTAsync_onFailure * onFailure
Definition: MQTTAsync.h:1393
#define MQTTASYNC_NO_MORE_MSGIDS
Definition: MQTTAsync.h:159
void onSubscribeFailure(void *context, MQTTAsync_failureData *response)
#define MQTTASYNC_MAX_BUFFERED_MESSAGES
Definition: MQTTAsync.h:167
#define MQTTAsync_connectOptions_initializer
Definition: MQTTAsync.h:1335
MQTTAsync_onSuccess * onSuccess
Definition: MQTTAsync.h:696
#define MQTTASYNC_BAD_STRUCTURE
Definition: MQTTAsync.h:151
void ConnectionLost(void *context, char *cause)
#define MQTTASYNC_WRONG_MQTT_VERSION
Definition: MQTTAsync.h:185
virtual bool start(QStringList *) override
MQTTAsync _client
PlotDataMapRef & dataMap()
MQTTAsync client
Definition: test6.c:276
detail::named_arg< Char, T > arg(const Char *name, const T &arg)
Definition: core.h:1656
void onConnectFailure(void *context, MQTTAsync_failureData *response)
void MQTTAsync_destroy(MQTTAsync *handle)
Definition: MQTTAsync.c:2554
#define MQTTASYNC_SUCCESS
Definition: MQTTAsync.h:113
Ui::DataStreamMQTT * ui
dictionary context
Definition: test2.py:57
#define MQTTCLIENT_PERSISTENCE_NONE
int MessageArrived(void *context, char *topicName, int topicLen, MQTTAsync_message *message)
QString _topic_filter
#define MQTTASYNC_PERSISTENCE_ERROR
Definition: MQTTAsync.h:122
char * password
Definition: test6.c:59
#define MQTTASYNC_BAD_PROTOCOL
Definition: MQTTAsync.h:177
void onSubscribe(void *context, MQTTAsync_successData *response)
#define MQTTASYNC_TOPICNAME_TRUNCATED
Definition: MQTTAsync.h:146
def timestamp()
Definition: mqttsas.py:143
enum MQTTReasonCodes rc
Definition: test10.c:1112
MQTTAsync_onSuccess * onSuccess
Definition: MQTTAsync.h:1249
#define MQTTASYNC_MAX_MESSAGES_INFLIGHT
Definition: MQTTAsync.h:132
#define MQTTASYNC_0_LEN_WILL_TOPIC
Definition: MQTTAsync.h:189
void onDisconnect(void *context, MQTTAsync_successData *response)
MQTTAsync_onSuccess * onSuccess
Definition: MQTTAsync.h:1387


plotjuggler
Author(s): Davide Faconti
autogenerated on Sun Dec 6 2020 03:47:34