2 #include "ui_datastream_zmq.h" 8 #include <QIntValidator> 18 ui->lineEditPort->setValidator(
new QIntValidator());
23 while (
ui->layoutOptions->count() > 0)
25 auto item =
ui->layoutOptions->takeAt(0);
26 item->widget()->setParent(
nullptr);
32 : _running(false), _zmq_context(), _zmq_socket(_zmq_context,
zmq::
socket_type::sub)
50 QMessageBox::warning(
nullptr, tr(
"UDP Server"), tr(
"No available MessageParsers"),
62 dialog->
ui->comboBoxProtocol->addItem(it.first);
64 if (
auto widget = it.second->optionsWidget())
66 widget->setVisible(
false);
67 dialog->
ui->layoutOptions->addWidget(widget);
73 QString address = settings.value(
"ZMQ_Subscriber::address",
"localhost").toString();
74 QString protocol = settings.value(
"ZMQ_Subscriber::protocol",
"JSON").toString();
75 QString topics = settings.value(
"ZMQ_Subscriber::topics",
"").toString();
77 int port = settings.value(
"ZMQ_Subscriber::port", 9872).toInt();
79 dialog->
ui->lineEditAddress->setText(address);
80 dialog->
ui->lineEditPort->setText(QString::number(port));
81 dialog->
ui->lineEditTopics->setText(topics);
85 connect(dialog->
ui->comboBoxProtocol,
86 qOverload<const QString&>(&QComboBox::currentIndexChanged),
this,
87 [&](
const QString& selected_protocol) {
90 if (auto prev_widget = parser_creator->optionsWidget())
92 prev_widget->setVisible(false);
97 if (
auto widget = parser_creator->optionsWidget())
99 widget->setVisible(
true);
103 dialog->ui->comboBoxProtocol->setCurrentText(protocol);
105 int res = dialog->exec();
106 if (res == QDialog::Rejected)
112 address = dialog->ui->lineEditAddress->text();
113 port = dialog->ui->lineEditPort->text().toUShort(&
ok);
114 protocol = dialog->ui->comboBoxProtocol->currentText();
115 topics = dialog->ui->lineEditTopics->text();
120 settings.setValue(
"ZMQ_Subscriber::address", address);
121 settings.setValue(
"ZMQ_Subscriber::protocol", protocol);
122 settings.setValue(
"ZMQ_Subscriber::port", port);
123 settings.setValue(
"ZMQ_Subscriber::topics", topics);
126 (dialog->ui->comboBox->currentText() + address +
":" + QString::number(port))
136 qDebug() <<
"ZMQ listening on address" << QString::fromStdString(
_socket_address);
141 dialog->deleteLater();
169 if (recv_msg.
size() > 0)
171 using namespace std::chrono;
172 auto ts = high_resolution_clock::now().time_since_epoch();
173 double timestamp = 1e-6 * double(duration_cast<microseconds>(ts).
count());
188 std::lock_guard<std::mutex> lock(
mutex());
189 _parser->parseMessage(msg, timestamp);
200 const QRegExp regex(
"(,{0,1}\\s+)|(;\\s*)");
202 if (topic_filters.trimmed().size() != 0)
204 const auto splitted = topic_filters.split(regex);
206 for (
const auto&
topic : splitted)
221 qDebug() <<
"ZMQ Subscribed topic" << QString::fromStdString(
topic);
231 qDebug() <<
"ZMQ Unsubscribed topic" << QString::fromStdString(
topic);
236 _topic_filters.clear();
virtual void shutdown() override
shutdown Stop streaming
bool parseMessage(const PJ::MessageRef &msg, double ×tamp)
zmq::socket_t _zmq_socket
std::shared_ptr< ParserFactoryPlugin > Ptr
virtual ~DataStreamZMQ() override
void * data() ZMQ_NOTHROW
const ParserFactories * parserFactories() const
constexpr auto count() -> size_t
std::vector< std::string > _topic_filters
std::thread _receive_thread
PlotDataMapRef & dataMap()
PJ::MessageParserPtr _parser
virtual bool start(QStringList *) override
start streaming.
std::string _socket_address
void connect(std::string const &addr)
size_t size() const ZMQ_NOTHROW
StreamZMQDialog(QWidget *parent=nullptr)
void parseTopicFilters(const QString &filters)
void disconnect(std::string const &addr)