datastream_mqtt.cpp
Go to the documentation of this file.
1 #include "datastream_mqtt.h"
2 #include "ui_datastream_mqtt.h"
3 #include <QMessageBox>
4 #include <QSettings>
5 #include <QDebug>
6 #include <QUuid>
7 #include <QIntValidator>
8 #include <QMessageBox>
9 
11  _running(false)
12 {
13  _notification_action = new QAction(this);
14 
15  connect(_notification_action, &QAction::triggered, this, [this]() {
16  QMessageBox::warning(nullptr, "MQTT error",
17  QString("Failed to parse %1 messages").arg(_failed_parsing),
18  QMessageBox::Ok);
19 
20  if (_failed_parsing > 0)
21  {
22  _failed_parsing = 0;
24  }
25  });
26 
27  _mosq = std::make_shared<MQTTClient>();
28  _dialog = new MQTT_Dialog(_mosq);
29 }
30 
32 {
33  shutdown();
34  delete _dialog;
35 }
36 
37 bool DataStreamMQTT::start(QStringList *)
38 {
39  if (_running)
40  {
41  return _running;
42  }
43 
44  //cleanup notifications
45  _failed_parsing = 0;
46  emit notificationsChanged(0);
47 
48  if( parserFactories() == nullptr || parserFactories()->empty() )
49  {
50  QMessageBox::warning(nullptr,tr("MQTT Client"), tr("No available MessageParsers"), QMessageBox::Ok);
51  _running = false;
52  return false;
53  }
54 
55  bool first_start = _dialog->ui->comboBoxProtocol->count() == 0;
56 
57  if( first_start )
58  {
59  // change the section of the dialog related to protocols
60  for( const auto& it: *parserFactories())
61  {
62  _dialog->ui->comboBoxProtocol->addItem( it.first );
63 
64  if(auto widget = it.second->optionsWidget() )
65  {
66  widget->setVisible(false);
67  _dialog->ui->layoutOptions->addWidget( widget );
68  }
69  }
70 
71  connect(_dialog->ui->comboBoxProtocol,
72  qOverload<const QString &>(&QComboBox::currentIndexChanged),
74  }
75 
76  _running = false;
77 
78  QSettings settings;
79  _protocol = settings.value("MosquittoMQTT::serialization_protocol", "JSON").toString();
80  _dialog->ui->comboBoxProtocol->setCurrentText(_protocol);
81 
82  if( _dialog->exec() == QDialog::Rejected )
83  {
84  return false;
85  }
86  _protocol = _dialog->ui->comboBoxProtocol->currentText();
87 
88  // remove all previous subscriptions and create new ones
89  for( const auto& topic: _mosq->config().topics )
90  {
91  _mosq->unsubscribe(topic);
92  }
93 
94  for (const auto& item: _dialog->ui->listWidget->selectedItems())
95  {
96  MQTTClient::TopicCallback callback = [this](const mosquitto_message* message)
97  {
98  onMessageReceived(message);
99  };
100 
101  std::string topic_name = item->text().toStdString();
102  _mosq->subscribe(topic_name, _mosq->config().qos);
103  _mosq->addMessageCallback(topic_name, callback);
104  }
105 
106  _running = true;
107  return _running;
108 }
109 
111 {
112  if( _running )
113  {
114  _running = false;
115  _parsers.clear();
116  _topic_to_parse.clear();
117  dataMap().clear();
118  }
119 }
120 
122 {
123  return _running;
124 }
125 
126 
127 void DataStreamMQTT::onComboProtocolChanged(const QString& selected_protocol)
128 {
130  {
131  if( auto prev_widget = _current_parser_creator->optionsWidget())
132  {
133  prev_widget->setVisible(false);
134  }
135  }
136  _current_parser_creator = parserFactories()->at(selected_protocol);
137 
138  if (auto widget = _current_parser_creator->optionsWidget())
139  {
140  widget->setVisible(true);
141  }
142 }
143 
144 void DataStreamMQTT::onMessageReceived(const mosquitto_message *message)
145 {
146  std::unique_lock<std::mutex> lk(mutex());
147 
148  auto it = _parsers.find(message->topic);
149  if( it == _parsers.end() )
150  {
151  auto& parser_factory = parserFactories()->at( _protocol );
152  auto parser = parser_factory->createParser({message->topic}, {}, {}, dataMap());
153  it = _parsers.insert( {message->topic, parser} ).first;
154  }
155  auto& parser = it->second;
156 
157  bool result = false;
158  try {
159  MessageRef msg( static_cast<uint8_t*>(message->payload), message->payloadlen);
160 
161  using namespace std::chrono;
162  auto ts = high_resolution_clock::now().time_since_epoch();
163  double timestamp = 1e-6* double( duration_cast<microseconds>(ts).count() );
164 
165  result = parser->parseMessage(msg, timestamp);
166  }
167  catch (std::exception& ) {}
168 
169  emit dataReceived();
170 
171  if( !result )
172  {
173  _failed_parsing++;
175  }
176 }
177 
~DataStreamMQTT() override
void onComboProtocolChanged(const QString &)
MQTTClient::Ptr _mosq
void onMessageReceived(const mosquitto_message *message)
std::function< void(const mosquitto_message *)> TopicCallback
Definition: mqtt_client.h:29
virtual void shutdown() override
shutdown Stop streaming
std::unordered_map< std::string, PJ::MessageParserPtr > _parsers
QString _topic_to_parse
MQTT_Dialog * _dialog
auto arg(const Char *name, const T &arg) -> detail::named_arg< Char, T >
Definition: core.h:1736
ParserFactoryPlugin::Ptr _current_parser_creator
std::mutex & mutex()
virtual bool isRunning() const override
isRunning
const ParserFactories * parserFactories() const
constexpr auto count() -> size_t
Definition: core.h:1050
QAction * _notification_action
virtual bool start(QStringList *) override
start streaming.
PlotDataMapRef & dataMap()
const T & first(const T &value, const Tail &...)
Definition: compile.h:178
Ui::DataStreamMQTT * ui
Definition: mqtt_dialog.h:19
void notificationsChanged(int active_notification_count)


plotjuggler
Author(s): Davide Faconti
autogenerated on Mon Jun 19 2023 03:01:01