datastream_zmq.cpp
Go to the documentation of this file.
1 #include "datastream_zmq.h"
2 #include "ui_datastream_zmq.h"
3 
4 #include <QMessageBox>
5 #include <QDebug>
6 #include <QSettings>
7 #include <QDialog>
8 #include <QIntValidator>
9 #include <chrono>
10 
12  QDialog(parent),
13  ui(new Ui::DataStreamZMQ)
14 {
15  ui->setupUi(this);
16  ui->lineEditPort->setValidator( new QIntValidator() );
17 }
18 
20 {
21  while( ui->layoutOptions->count() > 0)
22  {
23  auto item = ui->layoutOptions->takeAt(0);
24  item->widget()->setParent(nullptr);
25  }
26  delete ui;
27 }
28 
30  _running(false),
31  _zmq_context(),
32  _zmq_socket(_zmq_context, zmq::socket_type::sub)
33 {
34 }
35 
37 {
38  shutdown();
39 }
40 
41 bool DataStreamZMQ::start(QStringList*)
42 {
43  if (_running)
44  {
45  return _running;
46  }
47 
48  if( !availableParsers() )
49  {
50  QMessageBox::warning(nullptr,tr("UDP Server"), tr("No available MessageParsers"), QMessageBox::Ok);
51  _running = false;
52  return false;
53  }
54 
55  bool ok = false;
56 
57  StreamZMQDialog* dialog = new StreamZMQDialog();
58 
59  for( const auto& it: *availableParsers())
60  {
61  dialog->ui->comboBoxProtocol->addItem( it.first );
62 
63  if(auto widget = it.second->optionsWidget() )
64  {
65  widget->setVisible(false);
66  dialog->ui->layoutOptions->addWidget( widget );
67  }
68  }
69 
70  // load previous values
71  QSettings settings;
72  QString address = settings.value("ZMQ_Subscriber::address", "localhost").toString();
73  QString protocol = settings.value("ZMQ_Subscriber::protocol", "JSON").toString();
74  int port = settings.value("ZMQ_Subscriber::port", 9872).toInt();
75 
76 
77  dialog->ui->lineEditAddress->setText( address );
78  dialog->ui->lineEditPort->setText( QString::number(port) );
79 
80  std::shared_ptr<PJ::MessageParserCreator> parser_creator;
81 
82  connect(dialog->ui->comboBoxProtocol, qOverload<const QString &>( &QComboBox::currentIndexChanged), this,
83  [&](QString protocol)
84  {
85  if( parser_creator ){
86  QWidget* prev_widget = parser_creator->optionsWidget();
87  prev_widget->setVisible(false);
88  }
89  parser_creator = availableParsers()->at( protocol );
90 
91  if(auto widget = parser_creator->optionsWidget() ){
92  widget->setVisible(true);
93  }
94  });
95 
96  dialog->ui->comboBoxProtocol->setCurrentText(protocol);
97 
98  int res = dialog->exec();
99  if( res == QDialog::Rejected )
100  {
101  _running = false;
102  return false;
103  }
104 
105  address = dialog->ui->lineEditAddress->text();
106  port = dialog->ui->lineEditPort->text().toUShort(&ok);
107  protocol = dialog->ui->comboBoxProtocol->currentText();
108 
109  _parser = parser_creator->createInstance({}, dataMap());
110 
111  // save back to service
112  settings.setValue("ZMQ_Subscriber::address", address);
113  settings.setValue("ZMQ_Subscriber::protocol", protocol);
114  settings.setValue("ZMQ_Subscriber::port", port);
115 
117  (dialog->ui->comboBox->currentText()+
118  address+ ":" + QString::number(port)).toStdString();
119 
121  // subscribe to everything
122  _zmq_socket.set(zmq::sockopt::subscribe, "");
123  _zmq_socket.set(zmq::sockopt::rcvtimeo, 100);
124 
125  qDebug() << "ZMQ listening on address" << QString::fromStdString( _socket_address );
126  _running = true;
127 
128  _receive_thread = std::thread(&DataStreamZMQ::receiveLoop, this);
129 
130  dialog->deleteLater();
131  return _running;
132 }
133 
135 {
136  if( _running )
137  {
138  _running = false;
139  if( _receive_thread.joinable() )
140  {
141  _receive_thread.join();
142  }
144  _running = false;
145  }
146 }
147 
149 {
150  while( _running )
151  {
152  zmq::message_t recv_msg;
153  zmq::recv_result_t result = _zmq_socket.recv(recv_msg);
154 
155  if( recv_msg.size() > 0 )
156  {
157  using namespace std::chrono;
158  auto ts = high_resolution_clock::now().time_since_epoch();
159  double timestamp = 1e-6* double( duration_cast<microseconds>(ts).count() );
160 
161  PJ::MessageRef msg ( reinterpret_cast<uint8_t*>(recv_msg.data()), recv_msg.size() );
162 
163  try {
164  std::lock_guard<std::mutex> lock(mutex());
165  _parser->parseMessage(msg, timestamp);
166  } catch (std::exception& err)
167  {
168  QMessageBox::warning(nullptr,
169  tr("ZMQ Subscriber"),
170  tr("Problem parsing the message. ZMQ Subscriber will be stopped.\n%1").arg(err.what()),
171  QMessageBox::Ok);
172 
174  _running = false;
175  // notify the GUI
176  emit closed();
177  return;
178  }
179  }
180  }
181 }
182 
183 
virtual void shutdown() override
zmq::socket_t _zmq_socket
virtual ~DataStreamZMQ() override
MessageParserFactory * availableParsers()
void * data() ZMQ_NOTHROW
Definition: zmq.hpp:561
std::mutex & mutex()
constexpr size_t count()
Definition: core.h:960
Ui::DataStreamZMQ * ui
std::thread _receive_thread
PlotDataMapRef & dataMap()
detail::named_arg< Char, T > arg(const Char *name, const T &arg)
Definition: core.h:1656
PJ::MessageParserPtr _parser
virtual bool start(QStringList *) override
std::string _socket_address
void connect(std::string const &addr)
Definition: zmq.hpp:1812
size_t size() const ZMQ_NOTHROW
Definition: zmq.hpp:568
StreamZMQDialog(QWidget *parent=nullptr)
void disconnect(std::string const &addr)
Definition: zmq.hpp:1821
Definition: zmq.hpp:224
def timestamp()
Definition: mqttsas.py:143


plotjuggler
Author(s): Davide Faconti
autogenerated on Sun Dec 6 2020 03:47:34