2 #include "ui_datastream_zmq.h"
7 #include <QIntValidator>
19 ui->lineEditPort->setValidator(
new QIntValidator());
24 while (
ui->layoutOptions->count() > 0)
26 auto item =
ui->layoutOptions->takeAt(0);
27 item->widget()->setParent(
nullptr);
33 : _running(false), _zmq_context(), _zmq_socket(_zmq_context,
zmq::
socket_type::sub)
51 QMessageBox::warning(
nullptr, tr(
"UDP Server"), tr(
"No available MessageParsers"),
63 dialog->
ui->comboBoxProtocol->addItem(it.first);
65 if (
auto widget = it.second->optionsWidget())
67 widget->setVisible(
false);
68 dialog->
ui->layoutOptions->addWidget(widget);
74 QString address = settings.value(
"ZMQ_Subscriber::address",
"localhost").toString();
75 QString protocol = settings.value(
"ZMQ_Subscriber::protocol",
"JSON").toString();
76 QString topics = settings.value(
"ZMQ_Subscriber::topics",
"").toString();
77 _is_connect = settings.value(
"ZMQ_Subscriber::is_connect",
true).toBool();
79 QString previous_address = address;
81 connect(dialog->
ui->radioConnect, &QRadioButton::toggled, dialog, [&](
bool toggled) {
82 dialog->ui->lineEditAddress->setEnabled(toggled);
85 dialog->ui->lineEditAddress->setText(previous_address);
89 previous_address = dialog->ui->lineEditAddress->text();
90 dialog->ui->lineEditAddress->setText(
"0.0.0.0");
96 dialog->ui->radioConnect->setChecked(
true);
100 dialog->ui->radioBind->setChecked(
true);
103 int port = settings.value(
"ZMQ_Subscriber::port", 9872).toInt();
105 dialog->ui->lineEditAddress->setText(address);
106 dialog->ui->lineEditPort->setText(QString::number(port));
107 dialog->ui->lineEditTopics->setText(topics);
109 connect(dialog->ui->comboBoxProtocol,
110 qOverload<const QString&>(&QComboBox::currentIndexChanged),
this,
111 [&](
const QString& selected_protocol) {
114 if (auto prev_widget = _parser_creator->optionsWidget())
116 prev_widget->setVisible(false);
119 _parser_creator = parserFactories()->at(selected_protocol);
121 if (
auto widget = _parser_creator->optionsWidget())
123 widget->setVisible(true);
127 dialog->ui->comboBoxProtocol->setCurrentText(protocol);
129 int res = dialog->exec();
130 if (res == QDialog::Rejected)
136 address = dialog->ui->lineEditAddress->text();
137 port = dialog->ui->lineEditPort->text().toUShort(&ok);
138 protocol = dialog->ui->comboBoxProtocol->currentText();
139 topics = dialog->ui->lineEditTopics->text();
140 _is_connect = dialog->ui->radioConnect->isChecked();
142 _parser = _parser_creator->createParser({}, {}, {}, dataMap());
145 settings.setValue(
"ZMQ_Subscriber::address", address);
146 settings.setValue(
"ZMQ_Subscriber::protocol", protocol);
147 settings.setValue(
"ZMQ_Subscriber::port", port);
148 settings.setValue(
"ZMQ_Subscriber::topics", topics);
149 settings.setValue(
"ZMQ_Subscriber::is_connect", _is_connect);
152 dialog->ui->comboBox->currentText() + address +
":" + QString::number(port);
153 _socket_address =
addr.toStdString();
156 _zmq_socket.connect(_socket_address.c_str());
160 _zmq_socket.bind(_socket_address.c_str());
163 parseTopicFilters(topics);
167 for (
const auto&
topic : _topic_filters)
169 _parsers[
topic] = _parser_creator->createParser(
topic, {}, {}, dataMap());
172 _zmq_socket.set(zmq::sockopt::rcvtimeo, 100);
174 qDebug() <<
"ZMQ listening on address" << QString::fromStdString(_socket_address);
179 dialog->deleteLater();
212 zmq::recv_result_t result =
_zmq_socket.recv(recv_msg);
215 if (recv_msg.
size() <= 0)
221 std::string
topic =
"";
225 std::string(
reinterpret_cast<const char*
>(recv_msg.
data()), recv_msg.
size());
232 if (recv_msg.
size() <= 0)
248 if (recv_msg.
size() > 0)
252 std::string(
reinterpret_cast<const char*
>(recv_msg.
data()), recv_msg.
size()));
258 timestamp = std::chrono::duration_cast<std::chrono::microseconds>(
282 while (recv_msg.
more())
294 std::lock_guard<std::mutex> lock(
mutex());
309 std::lock_guard<std::mutex> lock(
mutex());
327 const QRegExp regex(
"(,{0,1}\\s+)|(;\\s*)");
329 if (topic_filters.trimmed().size() != 0)
331 const auto splitted = topic_filters.split(regex);
333 for (
const auto&
topic : splitted)
348 qDebug() <<
"ZMQ Subscribed topic" << QString::fromStdString(
topic);
358 qDebug() <<
"ZMQ Unsubscribed topic" << QString::fromStdString(
topic);