datastream_zmq.cpp
Go to the documentation of this file.
1 #include "datastream_zmq.h"
2 #include "ui_datastream_zmq.h"
3 
4 #include <QMessageBox>
5 #include <QDebug>
6 #include <QSettings>
7 #include <QDialog>
8 #include <QIntValidator>
9 #include <chrono>
11 
12 using namespace PJ;
13 
15  : QDialog(parent), ui(new Ui::DataStreamZMQ)
16 {
17  ui->setupUi(this);
18  ui->lineEditPort->setValidator(new QIntValidator());
19 }
20 
22 {
23  while (ui->layoutOptions->count() > 0)
24  {
25  auto item = ui->layoutOptions->takeAt(0);
26  item->widget()->setParent(nullptr);
27  }
28  delete ui;
29 }
30 
32  : _running(false), _zmq_context(), _zmq_socket(_zmq_context, zmq::socket_type::sub)
33 {
34 }
35 
37 {
38  shutdown();
39 }
40 
41 bool DataStreamZMQ::start(QStringList*)
42 {
43  if (_running)
44  {
45  return _running;
46  }
47 
48  if (parserFactories() == nullptr || parserFactories()->empty())
49  {
50  QMessageBox::warning(nullptr, tr("UDP Server"), tr("No available MessageParsers"),
51  QMessageBox::Ok);
52  _running = false;
53  return false;
54  }
55 
56  bool ok = false;
57 
58  StreamZMQDialog* dialog = new StreamZMQDialog();
59 
60  for (const auto& it : *parserFactories())
61  {
62  dialog->ui->comboBoxProtocol->addItem(it.first);
63 
64  if (auto widget = it.second->optionsWidget())
65  {
66  widget->setVisible(false);
67  dialog->ui->layoutOptions->addWidget(widget);
68  }
69  }
70 
71  // load previous values
72  QSettings settings;
73  QString address = settings.value("ZMQ_Subscriber::address", "localhost").toString();
74  QString protocol = settings.value("ZMQ_Subscriber::protocol", "JSON").toString();
75  QString topics = settings.value("ZMQ_Subscriber::topics", "").toString();
76 
77  int port = settings.value("ZMQ_Subscriber::port", 9872).toInt();
78 
79  dialog->ui->lineEditAddress->setText(address);
80  dialog->ui->lineEditPort->setText(QString::number(port));
81  dialog->ui->lineEditTopics->setText(topics);
82 
83  ParserFactoryPlugin::Ptr parser_creator;
84 
85  connect(dialog->ui->comboBoxProtocol,
86  qOverload<const QString&>(&QComboBox::currentIndexChanged), this,
87  [&](const QString& selected_protocol) {
88  if (parser_creator)
89  {
90  if (auto prev_widget = parser_creator->optionsWidget())
91  {
92  prev_widget->setVisible(false);
93  }
94  }
95  parser_creator = parserFactories()->at(selected_protocol);
96 
97  if (auto widget = parser_creator->optionsWidget())
98  {
99  widget->setVisible(true);
100  }
101  });
102 
103  dialog->ui->comboBoxProtocol->setCurrentText(protocol);
104 
105  int res = dialog->exec();
106  if (res == QDialog::Rejected)
107  {
108  _running = false;
109  return false;
110  }
111 
112  address = dialog->ui->lineEditAddress->text();
113  port = dialog->ui->lineEditPort->text().toUShort(&ok);
114  protocol = dialog->ui->comboBoxProtocol->currentText();
115  topics = dialog->ui->lineEditTopics->text();
116 
117  _parser = parser_creator->createParser({}, {}, {}, dataMap());
118 
119  // save back to service
120  settings.setValue("ZMQ_Subscriber::address", address);
121  settings.setValue("ZMQ_Subscriber::protocol", protocol);
122  settings.setValue("ZMQ_Subscriber::port", port);
123  settings.setValue("ZMQ_Subscriber::topics", topics);
124 
126  (dialog->ui->comboBox->currentText() + address + ":" + QString::number(port))
127  .toStdString();
128 
130 
131  parseTopicFilters(topics);
132  subscribeTopics();
133 
134  _zmq_socket.set(zmq::sockopt::rcvtimeo, 100);
135 
136  qDebug() << "ZMQ listening on address" << QString::fromStdString(_socket_address);
137  _running = true;
138 
139  _receive_thread = std::thread(&DataStreamZMQ::receiveLoop, this);
140 
141  dialog->deleteLater();
142  return _running;
143 }
144 
146 {
147  if (_running)
148  {
149  _running = false;
150 
151  if (_receive_thread.joinable())
152  {
153  _receive_thread.join();
154  }
155 
157 
159  }
160 }
161 
163 {
164  while (_running)
165  {
166  zmq::message_t recv_msg;
167  zmq::recv_result_t result = _zmq_socket.recv(recv_msg);
168 
169  if (recv_msg.size() > 0)
170  {
171  using namespace std::chrono;
172  auto ts = high_resolution_clock::now().time_since_epoch();
173  double timestamp = 1e-6 * double(duration_cast<microseconds>(ts).count());
174 
175  PJ::MessageRef msg(reinterpret_cast<uint8_t*>(recv_msg.data()), recv_msg.size());
176  if (parseMessage(msg, timestamp))
177  {
178  emit this->dataReceived();
179  }
180  }
181  }
182 }
183 
184 bool DataStreamZMQ::parseMessage(const PJ::MessageRef& msg, double& timestamp)
185 {
186  try
187  {
188  std::lock_guard<std::mutex> lock(mutex());
189  _parser->parseMessage(msg, timestamp);
190  return true;
191  }
192  catch (...)
193  {
194  return false;
195  }
196 }
197 
198 void DataStreamZMQ::parseTopicFilters(const QString& topic_filters)
199 {
200  const QRegExp regex("(,{0,1}\\s+)|(;\\s*)");
201 
202  if (topic_filters.trimmed().size() != 0)
203  {
204  const auto splitted = topic_filters.split(regex);
205 
206  for (const auto& topic : splitted)
207  {
208  _topic_filters.push_back(topic.toStdString());
209  }
210  }
211  else
212  {
213  _topic_filters.push_back("");
214  }
215 }
216 
218 {
219  for (const auto& topic : _topic_filters)
220  {
221  qDebug() << "ZMQ Subscribed topic" << QString::fromStdString(topic);
222 
223  _zmq_socket.set(zmq::sockopt::subscribe, topic);
224  }
225 }
226 
228 {
229  for (const auto& topic : _topic_filters)
230  {
231  qDebug() << "ZMQ Unsubscribed topic" << QString::fromStdString(topic);
232 
233  _zmq_socket.set(zmq::sockopt::unsubscribe, topic);
234  }
235 
236  _topic_filters.clear();
237 }
virtual void shutdown() override
shutdown Stop streaming
bool parseMessage(const PJ::MessageRef &msg, double &timestamp)
zmq::socket_t _zmq_socket
std::shared_ptr< ParserFactoryPlugin > Ptr
virtual ~DataStreamZMQ() override
void * data() ZMQ_NOTHROW
Definition: zmq.hpp:561
std::mutex & mutex()
Definition: lz4.c:1706
const ParserFactories * parserFactories() const
constexpr auto count() -> size_t
Definition: core.h:1050
Ui::DataStreamZMQ * ui
std::vector< std::string > _topic_filters
std::thread _receive_thread
PlotDataMapRef & dataMap()
PJ::MessageParserPtr _parser
virtual bool start(QStringList *) override
start streaming.
std::string _socket_address
void connect(std::string const &addr)
Definition: zmq.hpp:1812
size_t size() const ZMQ_NOTHROW
Definition: zmq.hpp:568
StreamZMQDialog(QWidget *parent=nullptr)
void parseTopicFilters(const QString &filters)
void disconnect(std::string const &addr)
Definition: zmq.hpp:1821
void unsubscribeTopics()
int socket_type
Definition: zmq.hpp:224


plotjuggler
Author(s): Davide Faconti
autogenerated on Mon Jun 19 2023 03:01:02