datastream_zmq.cpp
Go to the documentation of this file.
1 #include "datastream_zmq.h"
2 #include "ui_datastream_zmq.h"
3 
5 #include <QDebug>
6 #include <QDialog>
7 #include <QIntValidator>
8 #include <QMessageBox>
9 #include <QSettings>
10 #include <chrono>
11 #include <iostream>
12 
13 using namespace PJ;
14 
16  : QDialog(parent), ui(new Ui::DataStreamZMQ)
17 {
18  ui->setupUi(this);
19  ui->lineEditPort->setValidator(new QIntValidator());
20 }
21 
23 {
24  while (ui->layoutOptions->count() > 0)
25  {
26  auto item = ui->layoutOptions->takeAt(0);
27  item->widget()->setParent(nullptr);
28  }
29  delete ui;
30 }
31 
33  : _running(false), _zmq_context(), _zmq_socket(_zmq_context, zmq::socket_type::sub)
34 {
35 }
36 
38 {
39  shutdown();
40 }
41 
42 bool DataStreamZMQ::start(QStringList*)
43 {
44  if (_running)
45  {
46  return _running;
47  }
48 
49  if (parserFactories() == nullptr || parserFactories()->empty())
50  {
51  QMessageBox::warning(nullptr, tr("UDP Server"), tr("No available MessageParsers"),
52  QMessageBox::Ok);
53  _running = false;
54  return false;
55  }
56 
57  bool ok = false;
58 
59  StreamZMQDialog* dialog = new StreamZMQDialog();
60 
61  for (const auto& it : *parserFactories())
62  {
63  dialog->ui->comboBoxProtocol->addItem(it.first);
64 
65  if (auto widget = it.second->optionsWidget())
66  {
67  widget->setVisible(false);
68  dialog->ui->layoutOptions->addWidget(widget);
69  }
70  }
71 
72  // load previous values
73  QSettings settings;
74  QString address = settings.value("ZMQ_Subscriber::address", "localhost").toString();
75  QString protocol = settings.value("ZMQ_Subscriber::protocol", "JSON").toString();
76  QString topics = settings.value("ZMQ_Subscriber::topics", "").toString();
77  _is_connect = settings.value("ZMQ_Subscriber::is_connect", true).toBool();
78 
79  QString previous_address = address;
80 
81  connect(dialog->ui->radioConnect, &QRadioButton::toggled, dialog, [&](bool toggled) {
82  dialog->ui->lineEditAddress->setEnabled(toggled);
83  if (toggled)
84  {
85  dialog->ui->lineEditAddress->setText(previous_address);
86  }
87  else
88  {
89  previous_address = dialog->ui->lineEditAddress->text();
90  dialog->ui->lineEditAddress->setText("0.0.0.0");
91  }
92  });
93 
94  if (_is_connect)
95  {
96  dialog->ui->radioConnect->setChecked(true);
97  }
98  else
99  {
100  dialog->ui->radioBind->setChecked(true);
101  }
102 
103  int port = settings.value("ZMQ_Subscriber::port", 9872).toInt();
104 
105  dialog->ui->lineEditAddress->setText(address);
106  dialog->ui->lineEditPort->setText(QString::number(port));
107  dialog->ui->lineEditTopics->setText(topics);
108 
109  connect(dialog->ui->comboBoxProtocol,
110  qOverload<const QString&>(&QComboBox::currentIndexChanged), this,
111  [&](const QString& selected_protocol) {
112  if (_parser_creator)
113  {
114  if (auto prev_widget = _parser_creator->optionsWidget())
115  {
116  prev_widget->setVisible(false);
117  }
118  }
119  _parser_creator = parserFactories()->at(selected_protocol);
120 
121  if (auto widget = _parser_creator->optionsWidget())
122  {
123  widget->setVisible(true);
124  }
125  });
126 
127  dialog->ui->comboBoxProtocol->setCurrentText(protocol);
128 
129  int res = dialog->exec();
130  if (res == QDialog::Rejected)
131  {
132  _running = false;
133  return false;
134  }
135 
136  address = dialog->ui->lineEditAddress->text();
137  port = dialog->ui->lineEditPort->text().toUShort(&ok);
138  protocol = dialog->ui->comboBoxProtocol->currentText();
139  topics = dialog->ui->lineEditTopics->text();
140  _is_connect = dialog->ui->radioConnect->isChecked();
141 
142  _parser = _parser_creator->createParser({}, {}, {}, dataMap());
143 
144  // save back to service
145  settings.setValue("ZMQ_Subscriber::address", address);
146  settings.setValue("ZMQ_Subscriber::protocol", protocol);
147  settings.setValue("ZMQ_Subscriber::port", port);
148  settings.setValue("ZMQ_Subscriber::topics", topics);
149  settings.setValue("ZMQ_Subscriber::is_connect", _is_connect);
150 
151  QString addr =
152  dialog->ui->comboBox->currentText() + address + ":" + QString::number(port);
153  _socket_address = addr.toStdString();
154  if (_is_connect)
155  {
156  _zmq_socket.connect(_socket_address.c_str());
157  }
158  else
159  {
160  _zmq_socket.bind(_socket_address.c_str());
161  }
162 
163  parseTopicFilters(topics);
164  subscribeTopics();
165 
166  // Add a parser for each topic
167  for (const auto& topic : _topic_filters)
168  {
169  _parsers[topic] = _parser_creator->createParser(topic, {}, {}, dataMap());
170  }
171 
172  _zmq_socket.set(zmq::sockopt::rcvtimeo, 100);
173 
174  qDebug() << "ZMQ listening on address" << QString::fromStdString(_socket_address);
175  _running = true;
176 
177  _receive_thread = std::thread(&DataStreamZMQ::receiveLoop, this);
178 
179  dialog->deleteLater();
180  return _running;
181 }
182 
184 {
185  if (_running)
186  {
187  _running = false;
188 
189  if (_receive_thread.joinable())
190  {
191  _receive_thread.join();
192  }
193 
195 
196  if (_is_connect)
197  {
199  }
200  else
201  {
203  }
204  }
205 }
206 
208 {
209  while (_running)
210  {
211  zmq::message_t recv_msg;
212  zmq::recv_result_t result = _zmq_socket.recv(recv_msg);
213 
214  // If we did not receive anything, continue
215  if (recv_msg.size() <= 0)
216  {
217  continue;
218  }
219 
220  // If there are more parts, then it is the topic
221  std::string topic = "";
222  if (recv_msg.more())
223  {
224  topic =
225  std::string(reinterpret_cast<const char*>(recv_msg.data()), recv_msg.size());
226 
227  // Then it is the payload
228  recv_msg.rebuild();
229  result = _zmq_socket.recv(recv_msg);
230 
231  // If we did not receive anything, continue
232  if (recv_msg.size() <= 0)
233  {
234  continue;
235  }
236  }
237 
238  PJ::MessageRef msg{ PJ::MessageRef(reinterpret_cast<uint8_t*>(recv_msg.data()),
239  recv_msg.size()) };
240 
241  // If there are more parts, then it is the timestamp
242  double timestamp = 0.0;
243  if (recv_msg.more())
244  {
245  recv_msg.rebuild();
246  result = _zmq_socket.recv(recv_msg);
247 
248  if (recv_msg.size() > 0)
249  {
250  // The timestamp is the seconds since the epoch as a string
251  timestamp = std::stod(
252  std::string(reinterpret_cast<const char*>(recv_msg.data()), recv_msg.size()));
253  }
254  }
255  else
256  {
257  // If there are no more parts, the timestamp is the current time
258  timestamp = std::chrono::duration_cast<std::chrono::microseconds>(
259  std::chrono::high_resolution_clock::now().time_since_epoch())
260  .count() *
261  1e-6;
262  }
263 
264  // Parse the message without a topic if it is empty
265  if (topic.empty())
266  {
267  if (parseMessage(msg, timestamp))
268  {
269  emit this->dataReceived();
270  }
271  }
272  // Otherwise, parse the message with the topic
273  else
274  {
275  if (parseMessage(topic, msg, timestamp))
276  {
277  emit this->dataReceived();
278  }
279  }
280 
281  // Extinguish remaining parts (if any)
282  while (recv_msg.more())
283  {
284  recv_msg.rebuild();
285  result = _zmq_socket.recv(recv_msg);
286  }
287  }
288 }
289 
291 {
292  try
293  {
294  std::lock_guard<std::mutex> lock(mutex());
295  _parser->parseMessage(msg, timestamp);
296  return true;
297  }
298  catch (...)
299  {
300  return false;
301  }
302 }
303 
304 bool DataStreamZMQ::parseMessage(const std::string& topic, const PJ::MessageRef& msg,
305  double& timestamp)
306 {
307  try
308  {
309  std::lock_guard<std::mutex> lock(mutex());
310  // If the topic is not in the map keys, create a new parser
311  if (_parsers.find(topic) == _parsers.end())
312  {
313  _parsers[topic] = _parser_creator->createParser(topic, {}, {}, dataMap());
314  }
315 
316  _parsers[topic]->parseMessage(msg, timestamp);
317  return true;
318  }
319  catch (...)
320  {
321  return false;
322  }
323 }
324 
325 void DataStreamZMQ::parseTopicFilters(const QString& topic_filters)
326 {
327  const QRegExp regex("(,{0,1}\\s+)|(;\\s*)");
328 
329  if (topic_filters.trimmed().size() != 0)
330  {
331  const auto splitted = topic_filters.split(regex);
332 
333  for (const auto& topic : splitted)
334  {
335  _topic_filters.push_back(topic.toStdString());
336  }
337  }
338  else
339  {
340  _topic_filters.push_back("");
341  }
342 }
343 
345 {
346  for (const auto& topic : _topic_filters)
347  {
348  qDebug() << "ZMQ Subscribed topic" << QString::fromStdString(topic);
349 
350  _zmq_socket.set(zmq::sockopt::subscribe, topic);
351  }
352 }
353 
355 {
356  for (const auto& topic : _topic_filters)
357  {
358  qDebug() << "ZMQ Unsubscribed topic" << QString::fromStdString(topic);
359 
360  _zmq_socket.set(zmq::sockopt::unsubscribe, topic);
361  }
362 
363  _topic_filters.clear();
364 }
StreamZMQDialog::StreamZMQDialog
StreamZMQDialog(QWidget *parent=nullptr)
Definition: datastream_zmq.cpp:15
datastream_zmq.h
zmq::message_t
Definition: zmq.hpp:389
DataStreamZMQ
Definition: datastream_zmq.h:24
DataStreamZMQ::_parser
PJ::MessageParserPtr _parser
Definition: datastream_zmq.h:58
DataStreamZMQ::_parsers
std::map< std::string, PJ::MessageParserPtr > _parsers
Definition: datastream_zmq.h:62
zmq::message_t::data
void * data() ZMQ_NOTHROW
Definition: zmq.hpp:572
DataStreamZMQ::receiveLoop
void receiveLoop()
Definition: datastream_zmq.cpp:207
zmq::message_t::more
bool more() const ZMQ_NOTHROW
Definition: zmq.hpp:566
DataStreamZMQ::_zmq_socket
zmq::socket_t _zmq_socket
Definition: datastream_zmq.h:57
zmq::detail::socket_base::unbind
void unbind(std::string const &addr)
Definition: zmq.hpp:1917
DataStreamZMQ::_topic_filters
std::vector< std::string > _topic_filters
Definition: datastream_zmq.h:61
mqtt_test_proto.msg
msg
Definition: mqtt_test_proto.py:43
DataStreamZMQ::parseTopicFilters
void parseTopicFilters(const QString &filters)
Definition: datastream_zmq.cpp:325
ok
ROSCPP_DECL bool ok()
DataStreamZMQ::_receive_thread
std::thread _receive_thread
Definition: datastream_zmq.h:60
DataStreamZMQ::_socket_address
std::string _socket_address
Definition: datastream_zmq.h:59
zmq
Definition: zmq.hpp:222
DataStreamZMQ::DataStreamZMQ
DataStreamZMQ()
Definition: datastream_zmq.cpp:32
DataStreamZMQ::subscribeTopics
void subscribeTopics()
Definition: datastream_zmq.cpp:344
start_test_publisher.timestamp
timestamp
Definition: start_test_publisher.py:32
start_test_publisher.topic
topic
Definition: start_test_publisher.py:31
Ui
Definition: cheatsheet_dialog.h:6
DataStreamZMQ::unsubscribeTopics
void unsubscribeTopics()
Definition: datastream_zmq.cpp:354
StreamZMQDialog::~StreamZMQDialog
~StreamZMQDialog()
Definition: datastream_zmq.cpp:22
DataStreamZMQ::~DataStreamZMQ
virtual ~DataStreamZMQ() override
Definition: datastream_zmq.cpp:37
PJ::DataStreamer::parserFactories
const ParserFactories * parserFactories() const
Definition: datastreamer_base.cpp:34
DataStreamZMQ::shutdown
virtual void shutdown() override
shutdown Stop streaming
Definition: datastream_zmq.cpp:183
PJ::DataStreamer::dataMap
PlotDataMapRef & dataMap()
Definition: datastreamer_base.h:69
DataStreamZMQ::start
virtual bool start(QStringList *) override
start streaming.
Definition: datastream_zmq.cpp:42
zmq::message_t::rebuild
void rebuild()
Definition: zmq.hpp:496
DataStreamZMQ::parseMessage
bool parseMessage(const PJ::MessageRef &msg, double &timestamp)
Definition: datastream_zmq.cpp:290
socket_type
int socket_type
udp_client.addr
addr
Definition: udp_client.py:16
zmq::detail::socket_base::disconnect
void disconnect(std::string const &addr)
Definition: zmq.hpp:1941
PJ::DataStreamer::dataReceived
void dataReceived()
PJ::MessageRef
Definition: messageparser_base.h:28
PJ
Definition: dataloader_base.h:16
PJ::DataStreamer::mutex
std::mutex & mutex()
Definition: datastreamer_base.h:62
DataStreamZMQ::_running
bool _running
Definition: datastream_zmq.h:55
DataStreamZMQ::_is_connect
bool _is_connect
Definition: datastream_zmq.h:64
time_since_epoch
double time_since_epoch()
StreamZMQDialog::ui
Ui::DataStreamZMQ * ui
Definition: datastream_zmq.h:21
DataStreamZMQ::_parser_creator
PJ::ParserFactoryPlugin::Ptr _parser_creator
Definition: datastream_zmq.h:63
zmq::message_t::size
size_t size() const ZMQ_NOTHROW
Definition: zmq.hpp:582
messageparser_base.h
StreamZMQDialog
Definition: datastream_zmq.h:13


plotjuggler
Author(s): Davide Faconti
autogenerated on Mon May 26 2025 02:22:36