datastream_ROS.cpp
Go to the documentation of this file.
1 #include "datastream_ROS.h"
2 #include <QTextStream>
3 #include <QFile>
4 #include <QMessageBox>
5 #include <QDebug>
6 #include <thread>
7 #include <mutex>
8 #include <chrono>
9 #include <thread>
10 #include <QProgressDialog>
11 #include <QtGlobal>
12 #include <QApplication>
13 #include <QProcess>
14 #include <QCheckBox>
15 #include <QSettings>
16 #include <QFileDialog>
17 #include <ros/callback_queue.h>
18 #include <rosbag/bag.h>
20 #include <ros/transport_hints.h>
21 
23 #include "rule_editing.h"
24 #include "qnodedialog.h"
26 
28 , _action_saveIntoRosbag(nullptr)
29 , _prev_clock_time(0)
30 {
31  _running = false;
32  _first_warning = true;
33  _periodic_timer = new QTimer();
34  connect(_periodic_timer, &QTimer::timeout, this, &DataStreamROS::timerCallback);
35 
36  {
37  QSettings settings;
38  _config.loadFromSettings(settings, "DataStreamROS");
39  }
40 
41  _action_saveIntoRosbag = new QAction(QString("Save cached value in a rosbag"));
42  connect(_action_saveIntoRosbag, &QAction::changed, this, [this]() {
44  });
45 
46  _action_saveAny = new QAction(QString("Store messages in memory for Re-publishing"));
47  connect(_action_saveAny, &QAction::triggered, this, [this]() {
48  QSettings settings;
49  settings.value("DataStreamROS/storemessagesInMemory", _action_saveAny->isChecked() );
50  });
51  _action_saveAny->setCheckable( true );
52 
53  QSettings settings;
54  bool store_msg = settings.value("DataStreamROS/storemessagesInMemory", false).toBool();
55  _action_saveAny->setChecked( store_msg );
56 
58  _available_actions.push_back( _action_saveAny );
59 }
60 
61 void DataStreamROS::topicCallback(const RosIntrospection::ShapeShifter::ConstPtr& msg, const std::string& topic_name)
62 {
63  if (!_running)
64  {
65  return;
66  }
67 
68  emit dataReceived();
69 
70  using namespace RosIntrospection;
71  const auto& md5sum = msg->getMD5Sum();
72  const auto& datatype = msg->getDataType();
73  const auto& definition = msg->getMessageDefinition();
74 
75  // register the message type
76  _parser->registerMessageType(topic_name, datatype, definition);
77 
79 
80  //------------------------------------
81  std::vector<uint8_t> buffer;
82 
83  buffer.resize(msg->size());
84 
85  ros::serialization::OStream stream(buffer.data(), buffer.size());
86  msg->write(stream);
87 
88  double msg_time = ros::Time::now().toSec();
89  if (msg_time == 0)
90  {
91  // corner case: use_sim_time == true but topic /clock is not published
92  msg_time = ros::WallTime::now().toSec();
93 
94  auto tmp_config = _config;
95  tmp_config.use_header_stamp = false;
96  _parser->setConfig(tmp_config);
97  }
98 
99  // time wrapping may happen using use_sim_time = true and
100  // rosbag play --clock --loop
101  if (msg_time < _prev_clock_time)
102  {
103  // clear
104  for (auto& it : dataMap().numeric)
105  {
106  it.second.clear();
107  }
108  for (auto& it : _user_defined_buffers)
109  {
110  it.second.clear();
111  }
112  emit clearBuffers();
113  }
114  _prev_clock_time = msg_time;
115 
116  MessageRef buffer_view(buffer);
117 
118  try
119  {
120  // before pushing, lock the mutex
121  std::lock_guard<std::mutex> lock(mutex());
122  _parser->parseMessage(topic_name, buffer_view, msg_time);
123  }
124  catch (std::runtime_error& ex)
125  {
126  if( _first_warning ) {
127  _first_warning = false;
128  QMessageBox::warning(nullptr, tr("Error"),
129  QString("rosbag::open thrown an exception:\n") +
130  QString(ex.what()) +
131  "\nThis message will be shown only once.");
132  }
133  }
134 
135  const std::string prefixed_topic_name = _prefix + topic_name;
136 
137  bool save_any = _action_saveAny->isChecked();
138  // adding raw serialized msg for future uses.
139  // do this before msg_time normalization
140  if( save_any )
141  {
142  auto plot_pair = _user_defined_buffers.find(prefixed_topic_name);
143  if (plot_pair == _user_defined_buffers.end())
144  {
145  plot_pair = _user_defined_buffers.emplace(
146  std::piecewise_construct,
147  std::forward_as_tuple(prefixed_topic_name),
148  std::forward_as_tuple(prefixed_topic_name, PlotGroup::Ptr())).first;
149  }
150  PlotDataAny& user_defined_data = plot_pair->second;
151  user_defined_data.pushBack(PlotDataAny::Point(msg_time, std::any(std::move(buffer))));
152  }
153 
154  //------------------------------
155  if( save_any )
156  {
157  int& index = _msg_index[topic_name];
158  index++;
159  const std::string key = prefixed_topic_name + ("/_MSG_INDEX_");
160  auto index_it = dataMap().numeric.find(key);
161  if (index_it == dataMap().numeric.end())
162  {
163  index_it = dataMap().addNumeric(key);
164  }
165  index_it->second.pushBack(PlotData::Point(msg_time, index));
166  }
167 }
168 
170 {
171  using namespace std::chrono;
172  milliseconds wait_time_ms(1000);
173 
174  QProgressDialog progress_dialog;
175  progress_dialog.setLabelText("Collecting ROS topic samples to understand data layout. ");
176  progress_dialog.setRange(0, wait_time_ms.count());
177  progress_dialog.setAutoClose(true);
178  progress_dialog.setAutoReset(true);
179 
180  progress_dialog.show();
181 
182  auto start_time = system_clock::now();
183 
184  while (system_clock::now() - start_time < (wait_time_ms))
185  {
187  int i = duration_cast<milliseconds>(system_clock::now() - start_time).count();
188  progress_dialog.setValue(i);
189  QApplication::processEvents();
190  if (progress_dialog.wasCanceled())
191  {
192  break;
193  }
194  }
195 
196  if (progress_dialog.wasCanceled() == false)
197  {
198  progress_dialog.cancel();
199  }
200 }
201 
203 {
204  if (_running && ros::master::check() == false)
205  {
206  auto ret = QMessageBox::warning(nullptr, tr("Disconnected!"),
207  tr("The roscore master is not reachable anymore.\n\n"
208  "Do you want to try reconnecting to it? \n"),
209  tr("Stop Streaming"), tr("Try reconnect"), nullptr);
210  if (ret == 1)
211  {
212  this->shutdown();
214 
215  if (!_node)
216  {
217  emit closed();
218  return;
219  }
220  _parser.reset( new RosCompositeParser(dataMap()) );
221  subscribe();
222 
223  _running = true;
224  _spinner = std::make_shared<ros::AsyncSpinner>(1);
225  _spinner->start();
226  _periodic_timer->start();
227  }
228  else if (ret == 0)
229  {
230  this->shutdown();
231  emit closed();
232  }
233  }
234 
235  if( !ros::ok() )
236  {
237  QMessageBox::warning(nullptr, tr("ROS Stopped"), "The plugin will be stopped");
238  this->shutdown();
239 
240  emit closed();
241  }
242 }
243 
245 {
246  if (_user_defined_buffers.empty())
247  {
248  QMessageBox::warning(nullptr, tr("Warning"), tr("Your buffer is empty. Nothing to save.\n"));
249  return;
250  }
251 
252  QFileDialog saveDialog;
253  saveDialog.setAcceptMode(QFileDialog::AcceptSave);
254  saveDialog.setDefaultSuffix("bag");
255  saveDialog.exec();
256 
257  if (saveDialog.result() != QDialog::Accepted || saveDialog.selectedFiles().empty())
258  {
259  return;
260  }
261 
262  QString fileName = saveDialog.selectedFiles().first();
263 
264  if (fileName.size() > 0)
265  {
266  rosbag::Bag rosbag(fileName.toStdString(), rosbag::bagmode::Write);
267 
268  for (const auto& it : _user_defined_buffers)
269  {
270  const std::string& topicname = it.first;
271  const auto& plotdata = it.second;
272 
273  auto registered_msg_type = RosIntrospectionFactory::get().getShapeShifter(topicname);
274  if (!registered_msg_type){
275  continue;
276  }
277 
279  msg.morph(registered_msg_type->getMD5Sum(), registered_msg_type->getDataType(),
280  registered_msg_type->getMessageDefinition());
281 
282  for (int i = 0; i < plotdata.size(); i++)
283  {
284  const auto& point = plotdata.at(i);
285  const double msg_time = point.x;
286  const std::any& type_erased_buffer = point.y;
287 
288  if (type_erased_buffer.type() != typeid(std::vector<uint8_t>))
289  {
290  // can't cast to expected type
291  continue;
292  }
293 
294  std::vector<uint8_t> raw_buffer = std::any_cast<std::vector<uint8_t>>(type_erased_buffer);
295  ros::serialization::IStream stream(raw_buffer.data(), raw_buffer.size());
296  msg.read(stream);
297 
298  rosbag.write(topicname, ros::Time(msg_time), msg);
299  }
300  }
301  rosbag.close();
302 
303  QProcess process;
304  QStringList args;
305  args << "reindex" << fileName;
306  process.start("rosbag", args);
307  }
308 }
309 
311 {
312  _subscribers.clear();
313 
314  for (int i = 0; i < _config.topics.size(); i++)
315  {
316  const std::string topic_name = _config.topics[i].toStdString();
317  boost::function<void(const RosIntrospection::ShapeShifter::ConstPtr&)> callback;
318  callback = [this, topic_name](const RosIntrospection::ShapeShifter::ConstPtr& msg) -> void {
319  this->topicCallback(msg, topic_name);
320  };
321 
323  ops.initByFullCallbackType(topic_name, 1, callback);
325 
326  _subscribers.insert({ topic_name, _node->subscribe(ops) });
327  }
328 }
329 
330 bool DataStreamROS::start(QStringList* selected_datasources)
331 {
332  if (!_node)
333  {
335  }
336 
337  if (!_node)
338  {
339  return false;
340  }
341 
342  {
343  std::lock_guard<std::mutex> lock(mutex());
344  dataMap().numeric.clear();
345  dataMap().user_defined.clear();
346  _parser.reset( new RosCompositeParser(dataMap()) );
347  }
348 
349  using namespace RosIntrospection;
350 
351  std::vector<std::pair<QString, QString>> all_topics;
352  ros::master::V_TopicInfo topic_infos;
353  ros::master::getTopics(topic_infos);
354  for (ros::master::TopicInfo topic_info : topic_infos)
355  {
356  all_topics.push_back(std::make_pair(QString(topic_info.name.c_str()), QString(topic_info.datatype.c_str())));
357  }
358 
359  QTimer timer;
360  timer.setSingleShot(false);
361  timer.setInterval(1000);
362  timer.start();
363 
364  DialogSelectRosTopics dialog(all_topics, _config);
365 
366  connect(&timer, &QTimer::timeout, [&]() {
367  all_topics.clear();
368  topic_infos.clear();
369  ros::master::getTopics(topic_infos);
370  for (ros::master::TopicInfo topic_info : topic_infos)
371  {
372  all_topics.push_back({ QString::fromStdString(topic_info.name), QString::fromStdString(topic_info.datatype) });
373  }
374  dialog.updateTopicList(all_topics);
375  });
376 
377  int res = dialog.exec();
378  _config = dialog.getResult();
379  timer.stop();
380 
381  if (res != QDialog::Accepted || _config.topics.empty())
382  {
383  return false;
384  }
385 
386  {
387  QSettings settings;
388  _config.saveToSettings(settings, "DataStreamROS");
389  }
390 
391  _parser->setConfig(_config);
392 
393  //-------------------------
394  subscribe();
395  _running = true;
396  _first_warning = true;
397 
399 
400  _spinner = std::make_shared<ros::AsyncSpinner>(1);
401  _spinner->start();
402 
403  _periodic_timer->setInterval(500);
404  _periodic_timer->start();
405 
406  return true;
407 }
408 
410 {
411  return _running;
412 }
413 
415 {
416  _periodic_timer->stop();
417 
418  if (_spinner)
419  {
420  _spinner->stop();
421  }
422  _spinner.reset();
423 
424  if(_node ){
425  _node->shutdown();
426  }
427  _node.reset();
428 
429  _subscribers.clear();
430  _running = false;
431  _parser.reset();
432 }
433 
435 {
436  shutdown();
437 }
438 
439 bool DataStreamROS::xmlSaveState(QDomDocument& doc, QDomElement& plugin_elem) const
440 {
441  _config.xmlSaveState(doc, plugin_elem);
442  return true;
443 }
444 
445 bool DataStreamROS::xmlLoadState(const QDomElement& parent_element)
446 {
447  _config.xmlLoadState(parent_element);
448  return true;
449 }
450 
451 const std::vector<QAction *> &DataStreamROS::availableActions()
452 {
453  return _available_actions;
454 }
455 
void topicCallback(const RosIntrospection::ShapeShifter::ConstPtr &msg, const std::string &topic_name)
void xmlLoadState(const QDomElement &parent_element)
void xmlSaveState(QDomDocument &doc, QDomElement &plugin_elem) const
virtual const std::vector< QAction * > & availableActions() override
QTimer * _periodic_timer
ROSCPP_DECL bool check()
void initByFullCallbackType(const std::string &_topic, uint32_t _queue_size, const boost::function< void(P)> &_callback, const boost::function< boost::shared_ptr< typename ParameterAdapter< P >::Message >(void)> &factory_fn=DefaultMessageCreator< typename ParameterAdapter< P >::Message >())
static void registerMessage(const std::string &topic_name, const std::string &md5sum, const std::string &datatype, const std::string &definition)
void updateTopicList(std::vector< std::pair< QString, QString >> topic_list)
#define nullptr
std::shared_ptr< PlotGroup > Ptr
ros::NodeHandlePtr _node
void extractInitialSamples()
The ShapeShifter class is a type erased container for ROS Messages. It can be used also to create gen...
ROSCPP_DECL bool getTopics(V_TopicInfo &topics)
static RosIntrospectionFactory & get()
std::vector< TopicInfo > V_TopicInfo
std::string _prefix
virtual void shutdown() override
std::mutex & mutex()
std::unordered_map< std::string, PlotDataAny > _user_defined_buffers
void morph(const std::string &md5sum, const std::string &datatype_, const std::string &msg_def_)
std::vector< QAction * > _available_actions
std::unique_ptr< RosCompositeParser > _parser
ROSCPP_DECL CallbackQueue * getGlobalCallbackQueue()
TransportHints & tcpNoDelay(bool nodelay=true)
TimeseriesMap numeric
virtual bool start(QStringList *selected_datasources) override
std::map< std::string, int > _msg_index
const char * datatype()
constexpr auto count() -> size_t
virtual bool xmlLoadState(const QDomElement &parent_element) override
ret
ROSCPP_DECL bool ok()
iterator it
TimeseriesMap::iterator addNumeric(const std::string &name, PlotGroup::Ptr group={})
std::size_t index
static ros::NodeHandlePtr getNode()
TransportHints transport_hints
PlotDataMapRef & dataMap()
virtual ~DataStreamROS() override
QAction * _action_saveAny
size_t & i
void saveToSettings(QSettings &setting, QString prefix) const
typename PlotDataBase< double, Value >::Point Point
double _prev_clock_time
void pushBack(const Point &p) override
static WallTime now()
virtual bool xmlSaveState(QDomDocument &doc, QDomElement &parent_element) const override
static RosIntrospection::ShapeShifter * getShapeShifter(const std::string &topic_name)
static Time now()
AnySeriesMap user_defined
const char * md5sum()
PJ::RosParserConfig getResult() const
virtual bool isRunning() const override
const char * definition()
RosParserConfig _config
void loadFromSettings(const QSettings &settings, QString prefix)
std::map< std::string, ros::Subscriber > _subscribers
numeric
msg
std::shared_ptr< ros::AsyncSpinner > _spinner
QAction * _action_saveIntoRosbag


plotjuggler_ros
Author(s): Davide Faconti
autogenerated on Fri Jun 23 2023 02:28:03