datastream_zmq.cpp
Go to the documentation of this file.
1 #include "datastream_zmq.h"
2 #include "ui_datastream_zmq.h"
3 
4 #include <QMessageBox>
5 #include <QDebug>
6 #include <QSettings>
7 #include <QDialog>
8 #include <QIntValidator>
9 #include <chrono>
11 
12 using namespace PJ;
13 
15  : QDialog(parent), ui(new Ui::DataStreamZMQ)
16 {
17  ui->setupUi(this);
18  ui->lineEditPort->setValidator(new QIntValidator());
19 }
20 
22 {
23  while (ui->layoutOptions->count() > 0)
24  {
25  auto item = ui->layoutOptions->takeAt(0);
26  item->widget()->setParent(nullptr);
27  }
28  delete ui;
29 }
30 
32  : _running(false), _zmq_context(), _zmq_socket(_zmq_context, zmq::socket_type::sub)
33 {
34 }
35 
37 {
38  shutdown();
39 }
40 
41 bool DataStreamZMQ::start(QStringList*)
42 {
43  if (_running)
44  {
45  return _running;
46  }
47 
48  if (parserFactories() == nullptr || parserFactories()->empty())
49  {
50  QMessageBox::warning(nullptr, tr("UDP Server"), tr("No available MessageParsers"),
51  QMessageBox::Ok);
52  _running = false;
53  return false;
54  }
55 
56  bool ok = false;
57 
58  StreamZMQDialog* dialog = new StreamZMQDialog();
59 
60  for (const auto& it : *parserFactories())
61  {
62  dialog->ui->comboBoxProtocol->addItem(it.first);
63 
64  if (auto widget = it.second->optionsWidget())
65  {
66  widget->setVisible(false);
67  dialog->ui->layoutOptions->addWidget(widget);
68  }
69  }
70 
71  // load previous values
72  QSettings settings;
73  QString address = settings.value("ZMQ_Subscriber::address", "localhost").toString();
74  QString protocol = settings.value("ZMQ_Subscriber::protocol", "JSON").toString();
75  QString topics = settings.value("ZMQ_Subscriber::topics", "").toString();
76  bool is_connect = settings.value("ZMQ_Subscriber::is_connect", true).toBool();
77 
78  QString previous_address = address;
79 
80  connect(dialog->ui->radioConnect, &QRadioButton::toggled, dialog, [&](bool toggled) {
81  dialog->ui->lineEditAddress->setEnabled(toggled);
82  if (toggled)
83  {
84  dialog->ui->lineEditAddress->setText(previous_address);
85  }
86  else
87  {
88  previous_address = dialog->ui->lineEditAddress->text();
89  dialog->ui->lineEditAddress->setText("*");
90  }
91  });
92 
93  if (is_connect)
94  {
95  dialog->ui->radioConnect->setChecked(true);
96  }
97  else
98  {
99  dialog->ui->radioBind->setChecked(true);
100  }
101 
102  int port = settings.value("ZMQ_Subscriber::port", 9872).toInt();
103 
104  dialog->ui->lineEditAddress->setText(address);
105  dialog->ui->lineEditPort->setText(QString::number(port));
106  dialog->ui->lineEditTopics->setText(topics);
107 
108  ParserFactoryPlugin::Ptr parser_creator;
109 
110  connect(dialog->ui->comboBoxProtocol,
111  qOverload<const QString&>(&QComboBox::currentIndexChanged), this,
112  [&](const QString& selected_protocol) {
113  if (parser_creator)
114  {
115  if (auto prev_widget = parser_creator->optionsWidget())
116  {
117  prev_widget->setVisible(false);
118  }
119  }
120  parser_creator = parserFactories()->at(selected_protocol);
121 
122  if (auto widget = parser_creator->optionsWidget())
123  {
124  widget->setVisible(true);
125  }
126  });
127 
128  dialog->ui->comboBoxProtocol->setCurrentText(protocol);
129 
130  int res = dialog->exec();
131  if (res == QDialog::Rejected)
132  {
133  _running = false;
134  return false;
135  }
136 
137  address = dialog->ui->lineEditAddress->text();
138  port = dialog->ui->lineEditPort->text().toUShort(&ok);
139  protocol = dialog->ui->comboBoxProtocol->currentText();
140  topics = dialog->ui->lineEditTopics->text();
141  is_connect = dialog->ui->radioConnect->isChecked();
142 
143  _parser = parser_creator->createParser({}, {}, {}, dataMap());
144 
145  // save back to service
146  settings.setValue("ZMQ_Subscriber::address", address);
147  settings.setValue("ZMQ_Subscriber::protocol", protocol);
148  settings.setValue("ZMQ_Subscriber::port", port);
149  settings.setValue("ZMQ_Subscriber::topics", topics);
150  settings.setValue("ZMQ_Subscriber::is_connect", is_connect);
151 
152  QString addr =
153  dialog->ui->comboBox->currentText() + address + ":" + QString::number(port);
154  _socket_address = addr.toStdString();
155  if (is_connect)
156  {
157  _zmq_socket.connect(_socket_address.c_str());
158  }
159  else
160  {
161  _zmq_socket.bind(_socket_address.c_str());
162  }
163 
164  parseTopicFilters(topics);
165  subscribeTopics();
166 
167  _zmq_socket.set(zmq::sockopt::rcvtimeo, 100);
168 
169  qDebug() << "ZMQ listening on address" << QString::fromStdString(_socket_address);
170  _running = true;
171 
172  _receive_thread = std::thread(&DataStreamZMQ::receiveLoop, this);
173 
174  dialog->deleteLater();
175  return _running;
176 }
177 
179 {
180  if (_running)
181  {
182  _running = false;
183 
184  if (_receive_thread.joinable())
185  {
186  _receive_thread.join();
187  }
188 
190 
192  }
193 }
194 
196 {
197  while (_running)
198  {
199  zmq::message_t recv_msg;
200  zmq::recv_result_t result = _zmq_socket.recv(recv_msg);
201 
202  if (recv_msg.size() > 0)
203  {
204  using namespace std::chrono;
205  auto ts = high_resolution_clock::now().time_since_epoch();
206  double timestamp = 1e-6 * double(duration_cast<microseconds>(ts).count());
207 
208  PJ::MessageRef msg(reinterpret_cast<uint8_t*>(recv_msg.data()), recv_msg.size());
209  if (parseMessage(msg, timestamp))
210  {
211  emit this->dataReceived();
212  }
213  }
214  }
215 }
216 
217 bool DataStreamZMQ::parseMessage(const PJ::MessageRef& msg, double& timestamp)
218 {
219  try
220  {
221  std::lock_guard<std::mutex> lock(mutex());
222  _parser->parseMessage(msg, timestamp);
223  return true;
224  }
225  catch (...)
226  {
227  return false;
228  }
229 }
230 
231 void DataStreamZMQ::parseTopicFilters(const QString& topic_filters)
232 {
233  const QRegExp regex("(,{0,1}\\s+)|(;\\s*)");
234 
235  if (topic_filters.trimmed().size() != 0)
236  {
237  const auto splitted = topic_filters.split(regex);
238 
239  for (const auto& topic : splitted)
240  {
241  _topic_filters.push_back(topic.toStdString());
242  }
243  }
244  else
245  {
246  _topic_filters.push_back("");
247  }
248 }
249 
251 {
252  for (const auto& topic : _topic_filters)
253  {
254  qDebug() << "ZMQ Subscribed topic" << QString::fromStdString(topic);
255 
256  _zmq_socket.set(zmq::sockopt::subscribe, topic);
257  }
258 }
259 
261 {
262  for (const auto& topic : _topic_filters)
263  {
264  qDebug() << "ZMQ Unsubscribed topic" << QString::fromStdString(topic);
265 
266  _zmq_socket.set(zmq::sockopt::unsubscribe, topic);
267  }
268 
269  _topic_filters.clear();
270 }
StreamZMQDialog::StreamZMQDialog
StreamZMQDialog(QWidget *parent=nullptr)
Definition: datastream_zmq.cpp:14
datastream_zmq.h
zmq::message_t
Definition: zmq.hpp:389
DataStreamZMQ
Definition: datastream_zmq.h:22
DataStreamZMQ::_parser
PJ::MessageParserPtr _parser
Definition: datastream_zmq.h:56
zmq::message_t::data
void * data() ZMQ_NOTHROW
Definition: zmq.hpp:572
DataStreamZMQ::receiveLoop
void receiveLoop()
Definition: datastream_zmq.cpp:195
DataStreamZMQ::_zmq_socket
zmq::socket_t _zmq_socket
Definition: datastream_zmq.h:55
DataStreamZMQ::_topic_filters
std::vector< std::string > _topic_filters
Definition: datastream_zmq.h:59
mqtt_test_proto.msg
msg
Definition: mqtt_test_proto.py:43
DataStreamZMQ::parseTopicFilters
void parseTopicFilters(const QString &filters)
Definition: datastream_zmq.cpp:231
ok
ROSCPP_DECL bool ok()
DataStreamZMQ::_receive_thread
std::thread _receive_thread
Definition: datastream_zmq.h:58
DataStreamZMQ::_socket_address
std::string _socket_address
Definition: datastream_zmq.h:57
zmq
Definition: zmq.hpp:222
DataStreamZMQ::DataStreamZMQ
DataStreamZMQ()
Definition: datastream_zmq.cpp:31
DataStreamZMQ::subscribeTopics
void subscribeTopics()
Definition: datastream_zmq.cpp:250
detail::count
constexpr auto count() -> size_t
Definition: core.h:1222
start_test_publisher.topic
topic
Definition: start_test_publisher.py:22
Ui
Definition: cheatsheet_dialog.h:6
DataStreamZMQ::unsubscribeTopics
void unsubscribeTopics()
Definition: datastream_zmq.cpp:260
StreamZMQDialog::~StreamZMQDialog
~StreamZMQDialog()
Definition: datastream_zmq.cpp:21
DataStreamZMQ::~DataStreamZMQ
virtual ~DataStreamZMQ() override
Definition: datastream_zmq.cpp:36
PJ::DataStreamer::parserFactories
const ParserFactories * parserFactories() const
Definition: datastreamer_base.cpp:34
DataStreamZMQ::shutdown
virtual void shutdown() override
shutdown Stop streaming
Definition: datastream_zmq.cpp:178
DataStreamZMQ::start
virtual bool start(QStringList *) override
start streaming.
Definition: datastream_zmq.cpp:41
PJ::ParserFactoryPlugin::Ptr
std::shared_ptr< ParserFactoryPlugin > Ptr
Definition: messageparser_base.h:142
DataStreamZMQ::parseMessage
bool parseMessage(const PJ::MessageRef &msg, double &timestamp)
Definition: datastream_zmq.cpp:217
socket_type
int socket_type
zmq::detail::socket_base::disconnect
void disconnect(std::string const &addr)
Definition: zmq.hpp:1941
PJ::DataStreamer::dataReceived
void dataReceived()
PJ::MessageRef
Definition: messageparser_base.h:28
PJ
Definition: dataloader_base.h:16
PJ::DataStreamer::mutex
std::mutex & mutex()
Definition: datastreamer_base.h:62
DataStreamZMQ::_running
bool _running
Definition: datastream_zmq.h:53
StreamZMQDialog::ui
Ui::DataStreamZMQ * ui
Definition: datastream_zmq.h:19
zmq::message_t::size
size_t size() const ZMQ_NOTHROW
Definition: zmq.hpp:582
messageparser_base.h
StreamZMQDialog
Definition: datastream_zmq.h:11


plotjuggler
Author(s): Davide Faconti
autogenerated on Sun Aug 11 2024 02:24:22