2 #include "ui_datastream_zmq.h" 8 #include <QIntValidator> 16 ui->lineEditPort->setValidator(
new QIntValidator() );
21 while(
ui->layoutOptions->count() > 0)
23 auto item =
ui->layoutOptions->takeAt(0);
24 item->widget()->setParent(
nullptr);
32 _zmq_socket(_zmq_context,
zmq::socket_type::sub)
50 QMessageBox::warning(
nullptr,tr(
"UDP Server"), tr(
"No available MessageParsers"), QMessageBox::Ok);
61 dialog->
ui->comboBoxProtocol->addItem( it.first );
63 if(
auto widget = it.second->optionsWidget() )
65 widget->setVisible(
false);
66 dialog->
ui->layoutOptions->addWidget( widget );
72 QString address = settings.value(
"ZMQ_Subscriber::address",
"localhost").toString();
73 QString protocol = settings.value(
"ZMQ_Subscriber::protocol",
"JSON").toString();
74 int port = settings.value(
"ZMQ_Subscriber::port", 9872).toInt();
77 dialog->
ui->lineEditAddress->setText( address );
78 dialog->
ui->lineEditPort->setText( QString::number(port) );
80 std::shared_ptr<PJ::MessageParserCreator> parser_creator;
82 connect(dialog->
ui->comboBoxProtocol, qOverload<const QString &>( &QComboBox::currentIndexChanged),
this,
86 QWidget* prev_widget = parser_creator->optionsWidget();
87 prev_widget->setVisible(false);
91 if(
auto widget = parser_creator->optionsWidget() ){
92 widget->setVisible(
true);
96 dialog->ui->comboBoxProtocol->setCurrentText(protocol);
98 int res = dialog->exec();
99 if( res == QDialog::Rejected )
105 address = dialog->ui->lineEditAddress->text();
106 port = dialog->ui->lineEditPort->text().toUShort(&ok);
107 protocol = dialog->ui->comboBoxProtocol->currentText();
112 settings.setValue(
"ZMQ_Subscriber::address", address);
113 settings.setValue(
"ZMQ_Subscriber::protocol", protocol);
114 settings.setValue(
"ZMQ_Subscriber::port", port);
117 (dialog->ui->comboBox->currentText()+
118 address+
":" + QString::number(port)).toStdString();
125 qDebug() <<
"ZMQ listening on address" << QString::fromStdString(
_socket_address );
130 dialog->deleteLater();
155 if( recv_msg.
size() > 0 )
157 using namespace std::chrono;
158 auto ts = high_resolution_clock::now().time_since_epoch();
159 double timestamp = 1e-6* double( duration_cast<microseconds>(ts).
count() );
164 std::lock_guard<std::mutex> lock(
mutex());
165 _parser->parseMessage(msg, timestamp);
166 }
catch (std::exception& err)
168 QMessageBox::warning(
nullptr,
169 tr(
"ZMQ Subscriber"),
170 tr(
"Problem parsing the message. ZMQ Subscriber will be stopped.\n%1").
arg(err.what()),
virtual void shutdown() override
zmq::socket_t _zmq_socket
virtual ~DataStreamZMQ() override
MessageParserFactory * availableParsers()
void * data() ZMQ_NOTHROW
std::thread _receive_thread
PlotDataMapRef & dataMap()
detail::named_arg< Char, T > arg(const Char *name, const T &arg)
PJ::MessageParserPtr _parser
virtual bool start(QStringList *) override
std::string _socket_address
void connect(std::string const &addr)
size_t size() const ZMQ_NOTHROW
StreamZMQDialog(QWidget *parent=nullptr)
void disconnect(std::string const &addr)