2 #include "ui_datastream_zmq.h"
8 #include <QIntValidator>
18 ui->lineEditPort->setValidator(
new QIntValidator());
23 while (
ui->layoutOptions->count() > 0)
25 auto item =
ui->layoutOptions->takeAt(0);
26 item->widget()->setParent(
nullptr);
32 : _running(false), _zmq_context(), _zmq_socket(_zmq_context,
zmq::
socket_type::sub)
50 QMessageBox::warning(
nullptr, tr(
"UDP Server"), tr(
"No available MessageParsers"),
62 dialog->
ui->comboBoxProtocol->addItem(it.first);
64 if (
auto widget = it.second->optionsWidget())
66 widget->setVisible(
false);
67 dialog->
ui->layoutOptions->addWidget(widget);
73 QString address = settings.value(
"ZMQ_Subscriber::address",
"localhost").toString();
74 QString protocol = settings.value(
"ZMQ_Subscriber::protocol",
"JSON").toString();
75 QString topics = settings.value(
"ZMQ_Subscriber::topics",
"").toString();
76 bool is_connect = settings.value(
"ZMQ_Subscriber::is_connect",
true).toBool();
78 QString previous_address = address;
80 connect(dialog->
ui->radioConnect, &QRadioButton::toggled, dialog, [&](
bool toggled) {
81 dialog->ui->lineEditAddress->setEnabled(toggled);
84 dialog->ui->lineEditAddress->setText(previous_address);
88 previous_address = dialog->ui->lineEditAddress->text();
89 dialog->ui->lineEditAddress->setText(
"*");
95 dialog->ui->radioConnect->setChecked(
true);
99 dialog->ui->radioBind->setChecked(
true);
102 int port = settings.value(
"ZMQ_Subscriber::port", 9872).toInt();
104 dialog->ui->lineEditAddress->setText(address);
105 dialog->ui->lineEditPort->setText(QString::number(port));
106 dialog->ui->lineEditTopics->setText(topics);
110 connect(dialog->ui->comboBoxProtocol,
111 qOverload<const QString&>(&QComboBox::currentIndexChanged),
this,
112 [&](
const QString& selected_protocol) {
115 if (auto prev_widget = parser_creator->optionsWidget())
117 prev_widget->setVisible(false);
120 parser_creator = parserFactories()->at(selected_protocol);
122 if (
auto widget = parser_creator->optionsWidget())
124 widget->setVisible(true);
128 dialog->ui->comboBoxProtocol->setCurrentText(protocol);
130 int res = dialog->exec();
131 if (res == QDialog::Rejected)
137 address = dialog->ui->lineEditAddress->text();
138 port = dialog->ui->lineEditPort->text().toUShort(&ok);
139 protocol = dialog->ui->comboBoxProtocol->currentText();
140 topics = dialog->ui->lineEditTopics->text();
141 is_connect = dialog->ui->radioConnect->isChecked();
143 _parser = parser_creator->createParser({}, {}, {}, dataMap());
146 settings.setValue(
"ZMQ_Subscriber::address", address);
147 settings.setValue(
"ZMQ_Subscriber::protocol", protocol);
148 settings.setValue(
"ZMQ_Subscriber::port", port);
149 settings.setValue(
"ZMQ_Subscriber::topics", topics);
150 settings.setValue(
"ZMQ_Subscriber::is_connect", is_connect);
153 dialog->ui->comboBox->currentText() + address +
":" + QString::number(port);
154 _socket_address = addr.toStdString();
157 _zmq_socket.connect(_socket_address.c_str());
161 _zmq_socket.bind(_socket_address.c_str());
164 parseTopicFilters(topics);
167 _zmq_socket.set(zmq::sockopt::rcvtimeo, 100);
169 qDebug() <<
"ZMQ listening on address" << QString::fromStdString(_socket_address);
174 dialog->deleteLater();
200 zmq::recv_result_t result =
_zmq_socket.recv(recv_msg);
202 if (recv_msg.
size() > 0)
204 using namespace std::chrono;
205 auto ts = high_resolution_clock::now().time_since_epoch();
206 double timestamp = 1e-6 * double(duration_cast<microseconds>(ts).
count());
221 std::lock_guard<std::mutex> lock(
mutex());
233 const QRegExp regex(
"(,{0,1}\\s+)|(;\\s*)");
235 if (topic_filters.trimmed().size() != 0)
237 const auto splitted = topic_filters.split(regex);
239 for (
const auto&
topic : splitted)
254 qDebug() <<
"ZMQ Subscribed topic" << QString::fromStdString(
topic);
264 qDebug() <<
"ZMQ Unsubscribed topic" << QString::fromStdString(
topic);