datastream_mqtt.cpp
Go to the documentation of this file.
1 #include "datastream_mqtt.h"
2 #include "ui_datastream_mqtt.h"
3 #include <QMessageBox>
4 #include <QSettings>
5 #include <QDebug>
6 #include <QUuid>
7 #include <QIntValidator>
8 #include <QMessageBox>
9 
10 DataStreamMQTT::DataStreamMQTT() : _running(false)
11 {
12  _notification_action = new QAction(this);
13 
14  connect(_notification_action, &QAction::triggered, this, [this]() {
15  QMessageBox::warning(nullptr, "MQTT error",
16  QString("Failed to parse %1 messages").arg(_failed_parsing),
17  QMessageBox::Ok);
18 
19  if (_failed_parsing > 0)
20  {
21  _failed_parsing = 0;
23  }
24  });
25 
26  _mosq = std::make_shared<MQTTClient>();
27  _dialog = new MQTT_Dialog(_mosq);
28 }
29 
31 {
32  shutdown();
33  delete _dialog;
34 }
35 
36 bool DataStreamMQTT::start(QStringList*)
37 {
38  if (_running)
39  {
40  return _running;
41  }
42 
43  // cleanup notifications
44  _failed_parsing = 0;
45  emit notificationsChanged(0);
46 
47  if (parserFactories() == nullptr || parserFactories()->empty())
48  {
49  QMessageBox::warning(nullptr, tr("MQTT Client"), tr("No available MessageParsers"),
50  QMessageBox::Ok);
51  _running = false;
52  return false;
53  }
54 
55  bool first_start = _dialog->ui->comboBoxProtocol->count() == 0;
56 
57  if (first_start)
58  {
59  // change the section of the dialog related to protocols
60  for (const auto& it : *parserFactories())
61  {
62  _dialog->ui->comboBoxProtocol->addItem(it.first);
63 
64  if (auto widget = it.second->optionsWidget())
65  {
66  widget->setVisible(false);
67  _dialog->ui->layoutOptions->addWidget(widget);
68  }
69  }
70 
71  connect(_dialog->ui->comboBoxProtocol,
72  qOverload<const QString&>(&QComboBox::currentIndexChanged), this,
74  }
75 
76  _running = false;
77 
78  QSettings settings;
79  _protocol = settings.value("MosquittoMQTT::serialization_protocol", "JSON").toString();
80  _dialog->ui->comboBoxProtocol->setCurrentText(_protocol);
81 
82  if (_dialog->exec() == QDialog::Rejected)
83  {
84  return false;
85  }
86  _protocol = _dialog->ui->comboBoxProtocol->currentText();
87 
88  // remove all previous subscriptions and create new ones
89  for (const auto& topic : _mosq->config().topics)
90  {
91  _mosq->unsubscribe(topic);
92  }
93 
94  for (const auto& item : _dialog->ui->listWidget->selectedItems())
95  {
96  MQTTClient::TopicCallback callback = [this](const mosquitto_message* message) {
97  onMessageReceived(message);
98  };
99 
100  std::string topic_name = item->text().toStdString();
101  _mosq->subscribe(topic_name, _mosq->config().qos);
102  _mosq->addMessageCallback(topic_name, callback);
103  }
104 
105  _running = true;
106  return _running;
107 }
108 
110 {
111  if (_running)
112  {
113  _running = false;
114  _parsers.clear();
115  _topic_to_parse.clear();
116  dataMap().clear();
117  }
118 }
119 
121 {
122  return _running;
123 }
124 
125 void DataStreamMQTT::onComboProtocolChanged(const QString& selected_protocol)
126 {
128  {
129  if (auto prev_widget = _current_parser_creator->optionsWidget())
130  {
131  prev_widget->setVisible(false);
132  }
133  }
134  _current_parser_creator = parserFactories()->at(selected_protocol);
135 
136  if (auto widget = _current_parser_creator->optionsWidget())
137  {
138  widget->setVisible(true);
139  }
140 }
141 
142 void DataStreamMQTT::onMessageReceived(const mosquitto_message* message)
143 {
144  std::unique_lock<std::mutex> lk(mutex());
145 
146  auto it = _parsers.find(message->topic);
147  if (it == _parsers.end())
148  {
149  auto& parser_factory = parserFactories()->at(_protocol);
150  auto parser = parser_factory->createParser({ message->topic }, {}, {}, dataMap());
151  it = _parsers.insert({ message->topic, parser }).first;
152  }
153  auto& parser = it->second;
154 
155  bool result = false;
156  try
157  {
158  MessageRef msg(static_cast<uint8_t*>(message->payload), message->payloadlen);
159 
160  using namespace std::chrono;
161  auto ts = high_resolution_clock::now().time_since_epoch();
162  double timestamp = 1e-6 * double(duration_cast<microseconds>(ts).count());
163 
164  result = parser->parseMessage(msg, timestamp);
165  }
166  catch (std::exception&)
167  {
168  }
169 
170  emit dataReceived();
171 
172  if (!result)
173  {
174  _failed_parsing++;
176  }
177 }
PJ::DataStreamer::notificationsChanged
void notificationsChanged(int active_notification_count)
detail::first
auto first(const T &value, const Tail &...) -> const T &
Definition: compile.h:60
MQTT_Dialog
Definition: mqtt_dialog.h:12
DataStreamMQTT::_failed_parsing
int _failed_parsing
Definition: datastream_mqtt.h:66
DataStreamMQTT::DataStreamMQTT
DataStreamMQTT()
Definition: datastream_mqtt.cpp:10
MQTT_Dialog::ui
Ui::DataStreamMQTT * ui
Definition: mqtt_dialog.h:19
MQTTClient::TopicCallback
std::function< void(const mosquitto_message *)> TopicCallback
Definition: mqtt_client.h:29
DataStreamMQTT::isRunning
virtual bool isRunning() const override
isRunning
Definition: datastream_mqtt.cpp:120
DataStreamMQTT::_mosq
MQTTClient::Ptr _mosq
Definition: datastream_mqtt.h:58
arg
auto arg(const Char *name, const T &arg) -> detail::named_arg< Char, T >
Definition: core.h:1875
DataStreamMQTT::_notification_action
QAction * _notification_action
Definition: datastream_mqtt.h:65
DataStreamMQTT::start
virtual bool start(QStringList *) override
start streaming.
Definition: datastream_mqtt.cpp:36
DataStreamMQTT::_current_parser_creator
ParserFactoryPlugin::Ptr _current_parser_creator
Definition: datastream_mqtt.h:69
mqtt_test_proto.msg
msg
Definition: mqtt_test_proto.py:43
detail::count
constexpr auto count() -> size_t
Definition: core.h:1222
DataStreamMQTT::_running
bool _running
Definition: datastream_mqtt.h:54
start_test_publisher.topic
topic
Definition: start_test_publisher.py:22
PJ::DataStreamer::parserFactories
const ParserFactories * parserFactories() const
Definition: datastreamer_base.cpp:34
PJ::PlotDataMapRef::clear
void clear()
Definition: plotdata.cpp:125
PJ::DataStreamer::dataMap
PlotDataMapRef & dataMap()
Definition: datastreamer_base.h:69
udp_client.parser
parser
Definition: udp_client.py:9
datastream_mqtt.h
DataStreamMQTT::shutdown
virtual void shutdown() override
shutdown Stop streaming
Definition: datastream_mqtt.cpp:109
DataStreamMQTT::_protocol
QString _protocol
Definition: datastream_mqtt.h:61
PJ::DataStreamer::dataReceived
void dataReceived()
PJ::MessageRef
Definition: messageparser_base.h:28
DataStreamMQTT::~DataStreamMQTT
~DataStreamMQTT() override
Definition: datastream_mqtt.cpp:30
DataStreamMQTT::onMessageReceived
void onMessageReceived(const mosquitto_message *message)
Definition: datastream_mqtt.cpp:142
PJ::DataStreamer::mutex
std::mutex & mutex()
Definition: datastreamer_base.h:62
DataStreamMQTT::_topic_to_parse
QString _topic_to_parse
Definition: datastream_mqtt.h:63
DataStreamMQTT::_parsers
std::unordered_map< std::string, PJ::MessageParserPtr > _parsers
Definition: datastream_mqtt.h:56
DataStreamMQTT::onComboProtocolChanged
void onComboProtocolChanged(const QString &)
Definition: datastream_mqtt.cpp:125
DataStreamMQTT::_dialog
MQTT_Dialog * _dialog
Definition: datastream_mqtt.h:68


plotjuggler
Author(s): Davide Faconti
autogenerated on Sun Aug 11 2024 02:24:22