2 #include "ui_datastream_mqtt.h" 7 #include <QIntValidator> 16 QMessageBox::warning(
nullptr,
"MQTT error",
27 _mosq = std::make_shared<MQTTClient>();
50 QMessageBox::warning(
nullptr,tr(
"MQTT Client"), tr(
"No available MessageParsers"), QMessageBox::Ok);
55 bool first_start =
_dialog->
ui->comboBoxProtocol->count() == 0;
62 _dialog->
ui->comboBoxProtocol->addItem( it.first );
64 if(
auto widget = it.second->optionsWidget() )
66 widget->setVisible(
false);
67 _dialog->
ui->layoutOptions->addWidget( widget );
72 qOverload<const QString &>(&QComboBox::currentIndexChanged),
79 _protocol = settings.value(
"MosquittoMQTT::serialization_protocol",
"JSON").toString();
82 if(
_dialog->exec() == QDialog::Rejected )
89 for(
const auto&
topic:
_mosq->config().topics )
94 for (
const auto& item:
_dialog->
ui->listWidget->selectedItems())
101 std::string topic_name = item->text().toStdString();
102 _mosq->subscribe(topic_name,
_mosq->config().qos);
103 _mosq->addMessageCallback(topic_name, callback);
133 prev_widget->setVisible(
false);
140 widget->setVisible(
true);
146 std::unique_lock<std::mutex> lk(
mutex());
148 auto it =
_parsers.find(message->topic);
152 auto parser = parser_factory->createParser({message->topic}, {}, {},
dataMap());
155 auto&
parser = it->second;
159 MessageRef msg( static_cast<uint8_t*>(message->payload), message->payloadlen);
161 using namespace std::chrono;
162 auto ts = high_resolution_clock::now().time_since_epoch();
163 double timestamp = 1e-6* double( duration_cast<microseconds>(ts).
count() );
165 result =
parser->parseMessage(msg, timestamp);
167 catch (std::exception& ) {}
~DataStreamMQTT() override
void onComboProtocolChanged(const QString &)
void onMessageReceived(const mosquitto_message *message)
std::function< void(const mosquitto_message *)> TopicCallback
virtual void shutdown() override
shutdown Stop streaming
std::unordered_map< std::string, PJ::MessageParserPtr > _parsers
auto arg(const Char *name, const T &arg) -> detail::named_arg< Char, T >
ParserFactoryPlugin::Ptr _current_parser_creator
virtual bool isRunning() const override
isRunning
const ParserFactories * parserFactories() const
constexpr auto count() -> size_t
QAction * _notification_action
virtual bool start(QStringList *) override
start streaming.
PlotDataMapRef & dataMap()
const T & first(const T &value, const Tail &...)
void notificationsChanged(int active_notification_count)