datastream_ROS.cpp
Go to the documentation of this file.
1 #include "datastream_ROS.h"
2 #include <QTextStream>
3 #include <QFile>
4 #include <QMessageBox>
5 #include <QDebug>
6 #include <thread>
7 #include <mutex>
8 #include <chrono>
9 #include <thread>
10 #include <QProgressDialog>
11 #include <QtGlobal>
12 #include <QApplication>
13 #include <QProcess>
14 #include <QCheckBox>
15 #include <QSettings>
16 #include <QFileDialog>
17 #include <ros/callback_queue.h>
18 #include <rosbag/bag.h>
20 #include <ros/transport_hints.h>
21 
23 #include "qnodedialog.h"
25 
26 using PJ::PlotDataAny;
27 using PJ::PlotData;
28 
29 DataStreamROS::DataStreamROS() : DataStreamer(), _node(nullptr)
30 , _action_saveIntoRosbag(nullptr)
31 , _prev_clock_time(0)
32 {
33  _running = false;
34  _first_warning = true;
35  _periodic_timer = new QTimer();
36  connect(_periodic_timer, &QTimer::timeout, this, &DataStreamROS::timerCallback);
37 
38  {
39  QSettings settings;
40  _config.loadFromSettings(settings, "DataStreamROS");
41  }
42 
43  _action_saveIntoRosbag = new QAction(QString("Save cached value in a rosbag"));
44  connect(_action_saveIntoRosbag, &QAction::changed, this, [this]() {
46  });
47 
48  _action_saveAny = new QAction(QString("Store messages in memory for Re-publishing"));
49  connect(_action_saveAny, &QAction::triggered, this, [this]() {
50  QSettings settings;
51  settings.value("DataStreamROS/storemessagesInMemory", _action_saveAny->isChecked() );
52  });
53  _action_saveAny->setCheckable( true );
54 
55  QSettings settings;
56  bool store_msg = settings.value("DataStreamROS/storemessagesInMemory", false).toBool();
57  _action_saveAny->setChecked( store_msg );
58 
61 }
62 
63 void DataStreamROS::topicCallback(const RosIntrospection::ShapeShifter::ConstPtr& msg, const std::string& topic_name)
64 {
65  if (!_running)
66  {
67  return;
68  }
69 
70  emit dataReceived();
71 
72  using namespace RosIntrospection;
73  const auto& md5sum = msg->getMD5Sum();
74  const auto& datatype = msg->getDataType();
75  const auto& definition = msg->getMessageDefinition();
76 
77  // register the message type
78  if(!_parser.hasParser(topic_name))
79  {
80  auto ros_parser = CreateParserROS(*parserFactories(), topic_name, datatype, definition, dataMap());
81  _parser.addParser(topic_name, ros_parser);
83  }
84 
85  //------------------------------------
86  std::vector<uint8_t> buffer;
87  buffer.resize(msg->size());
88 
90  msg->write(stream);
91 
92  double msg_time = ros::Time::now().toSec();
93  if (msg_time == 0)
94  {
95  // corner case: use_sim_time == true but topic /clock is not published
96  msg_time = ros::WallTime::now().toSec();
97 
98  auto tmp_config = _config;
99  tmp_config.use_header_stamp = false;
100  _parser.setConfig(tmp_config);
101  }
102 
103  // time wrapping may happen using use_sim_time = true and
104  // rosbag play --clock --loop
105  if (msg_time < _prev_clock_time)
106  {
107  // clear
108  for (auto& it : dataMap().numeric)
109  {
110  it.second.clear();
111  }
112  for (auto& it : _user_defined_buffers)
113  {
114  it.second.clear();
115  }
116  emit clearBuffers();
117  }
118  _prev_clock_time = msg_time;
119 
120  PJ::MessageRef buffer_view(buffer);
121 
122  try
123  {
124  // before pushing, lock the mutex
125  std::lock_guard<std::mutex> lock(mutex());
126  _parser.parseMessage(topic_name, buffer_view, msg_time);
127  }
128  catch (std::runtime_error& ex)
129  {
130  if( _first_warning ) {
131  _first_warning = false;
132  QMessageBox::warning(nullptr, tr("Error"),
133  QString("rosbag::open thrown an exception:\n") +
134  QString(ex.what()) +
135  "\nThis message will be shown only once.");
136  }
137  }
138 
139  const std::string prefixed_topic_name = _prefix + topic_name;
140 
141  bool save_any = _action_saveAny->isChecked();
142  // adding raw serialized msg for future uses.
143  // do this before msg_time normalization
144  if( save_any )
145  {
146  auto plot_pair = _user_defined_buffers.find(prefixed_topic_name);
147  if (plot_pair == _user_defined_buffers.end())
148  {
149  plot_pair = _user_defined_buffers.emplace(
150  std::piecewise_construct,
151  std::forward_as_tuple(prefixed_topic_name),
152  std::forward_as_tuple(prefixed_topic_name, PJ::PlotGroup::Ptr())).first;
153  }
154  PlotDataAny& user_defined_data = plot_pair->second;
155  user_defined_data.pushBack(PlotDataAny::Point(msg_time, std::any(std::move(buffer))));
156  }
157 
158  //------------------------------
159  if( save_any )
160  {
161  int& index = _msg_index[topic_name];
162  index++;
163  const std::string key = prefixed_topic_name + ("/_MSG_INDEX_");
164  auto index_it = dataMap().numeric.find(key);
165  if (index_it == dataMap().numeric.end())
166  {
167  index_it = dataMap().addNumeric(key);
168  }
169  index_it->second.pushBack(PlotData::Point(msg_time, index));
170  }
171 }
172 
174 {
175  using namespace std::chrono;
176  milliseconds wait_time_ms(1000);
177 
178  QProgressDialog progress_dialog;
179  progress_dialog.setLabelText("Collecting ROS topic samples to understand data layout. ");
180  progress_dialog.setRange(0, wait_time_ms.count());
181  progress_dialog.setAutoClose(true);
182  progress_dialog.setAutoReset(true);
183 
184  progress_dialog.show();
185 
186  auto start_time = system_clock::now();
187 
188  while (system_clock::now() - start_time < (wait_time_ms))
189  {
191  int i = duration_cast<milliseconds>(system_clock::now() - start_time).count();
192  progress_dialog.setValue(i);
193  QApplication::processEvents();
194  if (progress_dialog.wasCanceled())
195  {
196  break;
197  }
198  }
199 
200  if (progress_dialog.wasCanceled() == false)
201  {
202  progress_dialog.cancel();
203  }
204 }
205 
207 {
208  if (_running && ros::master::check() == false)
209  {
210  auto ret = QMessageBox::warning(nullptr, tr("Disconnected!"),
211  tr("The roscore master is not reachable anymore.\n\n"
212  "Do you want to try reconnecting to it? \n"),
213  tr("Stop Streaming"), tr("Try reconnect"), nullptr);
214  if (ret == 1)
215  {
216  this->shutdown();
218 
219  if (!_node)
220  {
221  emit closed();
222  return;
223  }
224  _parser.clear();
225  subscribe();
226 
227  _running = true;
228  _spinner = std::make_shared<ros::AsyncSpinner>(1);
229  _spinner->start();
230  _periodic_timer->start();
231  }
232  else if (ret == 0)
233  {
234  this->shutdown();
235  emit closed();
236  }
237  }
238 
239  if( !ros::ok() )
240  {
241  QMessageBox::warning(nullptr, tr("ROS Stopped"), "The plugin will be stopped");
242  this->shutdown();
243 
244  emit closed();
245  }
246 }
247 
249 {
250  if (_user_defined_buffers.empty())
251  {
252  QMessageBox::warning(nullptr, tr("Warning"), tr("Your buffer is empty. Nothing to save.\n"));
253  return;
254  }
255 
256  QFileDialog saveDialog;
257  saveDialog.setAcceptMode(QFileDialog::AcceptSave);
258  saveDialog.setDefaultSuffix("bag");
259  saveDialog.exec();
260 
261  if (saveDialog.result() != QDialog::Accepted || saveDialog.selectedFiles().empty())
262  {
263  return;
264  }
265 
266  QString fileName = saveDialog.selectedFiles().first();
267 
268  if (fileName.size() > 0)
269  {
270  rosbag::Bag rosbag(fileName.toStdString(), rosbag::bagmode::Write);
271 
272  for (const auto& it : _user_defined_buffers)
273  {
274  const std::string& topicname = it.first;
275  const auto& plotdata = it.second;
276 
277  auto registered_msg_type = RosIntrospectionFactory::get().getShapeShifter(topicname);
278  if (!registered_msg_type){
279  continue;
280  }
281 
283  msg.morph(registered_msg_type->getMD5Sum(), registered_msg_type->getDataType(),
284  registered_msg_type->getMessageDefinition());
285 
286  for (int i = 0; i < plotdata.size(); i++)
287  {
288  const auto& point = plotdata.at(i);
289  const double msg_time = point.x;
290  const std::any& type_erased_buffer = point.y;
291 
292  if (type_erased_buffer.type() != typeid(std::vector<uint8_t>))
293  {
294  // can't cast to expected type
295  continue;
296  }
297 
298  std::vector<uint8_t> raw_buffer = std::any_cast<std::vector<uint8_t>>(type_erased_buffer);
299  ros::serialization::IStream stream(raw_buffer.data(), raw_buffer.size());
300  msg.read(stream);
301 
302  rosbag.write(topicname, ros::Time(msg_time), msg);
303  }
304  }
305  rosbag.close();
306 
307  QProcess process;
308  QStringList args;
309  args << "reindex" << fileName;
310  process.start("rosbag", args);
311  }
312 }
313 
315 {
316  _subscribers.clear();
317 
318  for (int i = 0; i < _config.topics.size(); i++)
319  {
320  const std::string topic_name = _config.topics[i].toStdString();
321  boost::function<void(const RosIntrospection::ShapeShifter::ConstPtr&)> callback;
322  callback = [this, topic_name](const RosIntrospection::ShapeShifter::ConstPtr& msg) -> void {
323  this->topicCallback(msg, topic_name);
324  };
325 
327  ops.initByFullCallbackType(topic_name, 1, callback);
329 
330  _subscribers.insert({ topic_name, _node->subscribe(ops) });
331  }
332 }
333 
334 bool DataStreamROS::start(QStringList* selected_datasources)
335 {
336  if (!_node)
337  {
339  }
340 
341  if (!_node)
342  {
343  return false;
344  }
345 
346  {
347  std::lock_guard<std::mutex> lock(mutex());
348  dataMap().numeric.clear();
349  dataMap().user_defined.clear();
350  _parser.clear();
351  }
352 
353  using namespace RosIntrospection;
354 
355  std::vector<std::pair<QString, QString>> all_topics;
356  ros::master::V_TopicInfo topic_infos;
357  ros::master::getTopics(topic_infos);
358  for (ros::master::TopicInfo topic_info : topic_infos)
359  {
360  all_topics.push_back(std::make_pair(QString(topic_info.name.c_str()), QString(topic_info.datatype.c_str())));
361  }
362 
363  QTimer timer;
364  timer.setSingleShot(false);
365  timer.setInterval(1000);
366  timer.start();
367 
368  DialogSelectRosTopics dialog(all_topics, _config);
369 
370  connect(&timer, &QTimer::timeout, [&]() {
371  all_topics.clear();
372  topic_infos.clear();
373  ros::master::getTopics(topic_infos);
374  for (ros::master::TopicInfo topic_info : topic_infos)
375  {
376  all_topics.push_back({ QString::fromStdString(topic_info.name), QString::fromStdString(topic_info.datatype) });
377  }
378  dialog.updateTopicList(all_topics);
379  });
380 
381  int res = dialog.exec();
382  _config = dialog.getResult();
383  timer.stop();
384 
385  if (res != QDialog::Accepted || _config.topics.empty())
386  {
387  return false;
388  }
389 
390  {
391  QSettings settings;
392  _config.saveToSettings(settings, "DataStreamROS");
393  }
394 
396 
397  //-------------------------
398  subscribe();
399  _running = true;
400  _first_warning = true;
401 
403 
404  _spinner = std::make_shared<ros::AsyncSpinner>(1);
405  _spinner->start();
406 
407  _periodic_timer->setInterval(500);
408  _periodic_timer->start();
409 
410  return true;
411 }
412 
414 {
415  return _running;
416 }
417 
419 {
420  _periodic_timer->stop();
421 
422  if (_spinner)
423  {
424  _spinner->stop();
425  }
426  _spinner.reset();
427 
428  if(_node ){
429  _node->shutdown();
430  }
431  _node.reset();
432 
433  _subscribers.clear();
434  _running = false;
435  _parser.clear();
436 }
437 
439 {
440  shutdown();
441 }
442 
443 bool DataStreamROS::xmlSaveState(QDomDocument& doc, QDomElement& plugin_elem) const
444 {
445  _config.xmlSaveState(doc, plugin_elem);
446  return true;
447 }
448 
449 bool DataStreamROS::xmlLoadState(const QDomElement& parent_element)
450 {
451  _config.xmlLoadState(parent_element);
452  return true;
453 }
454 
455 const std::vector<QAction *> &DataStreamROS::availableActions()
456 {
457  return _available_actions;
458 }
459 
buffer::data
auto data() const FMT_NOEXCEPT -> const T *
PJ::RosParserConfig::use_header_stamp
bool use_header_stamp
Definition: parser_configuration.h:18
ros::master::check
ROSCPP_DECL bool check()
ros::serialization::OStream
numeric
numeric
PJ::TimeseriesBase
md5sum
const char * md5sum()
DataStreamROS::_action_saveIntoRosbag
QAction * _action_saveIntoRosbag
Definition: datastream_ROS.h:68
rosbag::Bag
DataStreamROS::saveIntoRosbag
void saveIntoRosbag()
Definition: datastream_ROS.cpp:248
qnodedialog.h
DataStreamROS::timerCallback
void timerCallback()
Definition: datastream_ROS.cpp:206
boost::shared_ptr
PJ::RosParserConfig::saveToSettings
void saveToSettings(QSettings &setting, QString prefix) const
Definition: parser_configuration.cpp:58
DataStreamROS::_user_defined_buffers
std::unordered_map< std::string, PJ::PlotDataAny > _user_defined_buffers
Definition: datastream_ROS.h:84
definition
const char * definition()
DataStreamROS::_spinner
std::shared_ptr< ros::AsyncSpinner > _spinner
Definition: datastream_ROS.h:56
RosIntrospectionFactory::get
static RosIntrospectionFactory & get()
Definition: shape_shifter_factory.hpp:37
DataStreamROS::_config
PJ::RosParserConfig _config
Definition: datastream_ROS.h:76
DataStreamROS::~DataStreamROS
virtual ~DataStreamROS() override
Definition: datastream_ROS.cpp:438
PJ::TimeseriesBase::pushBack
void pushBack(const Point &p) override
PJ::CompositeParser::setConfig
void setConfig(const RosParserConfig &config)
Definition: parser_configuration.cpp:90
index
std::size_t index
PJ::PlotDataMapRef::addNumeric
TimeseriesMap::iterator addNumeric(const std::string &name, PlotGroup::Ptr group={})
datastream_ROS.h
DataStreamROS::xmlSaveState
virtual bool xmlSaveState(QDomDocument &doc, QDomElement &parent_element) const override
Definition: datastream_ROS.cpp:443
ros::SubscribeOptions::transport_hints
TransportHints transport_hints
DataStreamROS::_prev_clock_time
double _prev_clock_time
Definition: datastream_ROS.h:82
RosIntrospectionFactory::registerMessage
static void registerMessage(const std::string &topic_name, const std::string &md5sum, const std::string &datatype, const std::string &definition)
Definition: shape_shifter_factory.hpp:44
ros::TransportHints
TimeBase< Time, Duration >::toSec
double toSec() const
DataStreamROS::subscribe
void subscribe()
Definition: datastream_ROS.cpp:314
i
size_t & i
buffer
dialog_select_ros_topics.h
DataStreamROS::_subscribers
std::map< std::string, ros::Subscriber > _subscribers
Definition: datastream_ROS.h:64
msg
msg
ros::serialization::IStream
DataStreamROS::_available_actions
std::vector< QAction * > _available_actions
Definition: datastream_ROS.h:72
bag.h
ros::master::getTopics
ROSCPP_DECL bool getTopics(V_TopicInfo &topics)
ros::ok
ROSCPP_DECL bool ok()
ros::TransportHints::tcpNoDelay
TransportHints & tcpNoDelay(bool nodelay=true)
DataStreamROS::xmlLoadState
virtual bool xmlLoadState(const QDomElement &parent_element) override
Definition: datastream_ROS.cpp:449
PJ::PlotDataMapRef::numeric
TimeseriesMap numeric
DataStreamROS::extractInitialSamples
void extractInitialSamples()
Definition: datastream_ROS.cpp:173
DataStreamROS::isRunning
virtual bool isRunning() const override
Definition: datastream_ROS.cpp:413
DialogSelectRosTopics
Definition: dialog_select_ros_topics.h:19
PJ::PlotGroup::Ptr
std::shared_ptr< PlotGroup > Ptr
transport_hints.h
DialogSelectRosTopics::updateTopicList
void updateTopicList(std::vector< std::pair< QString, QString >> topic_list)
Definition: dialog_select_ros_topics.cpp:81
DataStreamROS::_first_warning
bool _first_warning
Definition: datastream_ROS.h:54
PJ::RosParserConfig::loadFromSettings
void loadFromSettings(const QSettings &settings, QString prefix)
Definition: parser_configuration.cpp:68
PJ::RosParserConfig::xmlLoadState
void xmlLoadState(const QDomElement &parent_element)
Definition: parser_configuration.cpp:35
ros::master::V_TopicInfo
std::vector< TopicInfo > V_TopicInfo
PJ::RosParserConfig::xmlSaveState
void xmlSaveState(QDomDocument &doc, QDomElement &plugin_elem) const
Definition: parser_configuration.cpp:8
ros::WallTime::now
static WallTime now()
PJ::PlotData
TimeseriesBase< double > PlotData
DataStreamROS::start
virtual bool start(QStringList *selected_datasources) override
Definition: datastream_ROS.cpp:334
PJ::CompositeParser::hasParser
bool hasParser(const std::string &topic_name)
Definition: parser_configuration.h:46
DataStreamROS::shutdown
virtual void shutdown() override
Definition: datastream_ROS.cpp:418
PJ::DataStreamer::parserFactories
const ParserFactories * parserFactories() const
ros::SubscribeOptions::initByFullCallbackType
void initByFullCallbackType(const std::string &_topic, uint32_t _queue_size, const boost::function< void(P)> &_callback, const boost::function< boost::shared_ptr< typename ParameterAdapter< P >::Message >(void)> &factory_fn=DefaultMessageCreator< typename ParameterAdapter< P >::Message >())
PJ::DataStreamer::dataMap
PlotDataMapRef & dataMap()
rosbag::bagmode::Write
Write
it
iterator it
void
j template void())
PJ::TimeseriesBase::Point
typename PlotDataBase< double, Value >::Point Point
PJ::DataStreamer::closed
void closed()
PJ::CompositeParser::clear
void clear()
Definition: parser_configuration.h:44
RosIntrospection
callback_queue.h
PJ::CompositeParser::parseMessage
bool parseMessage(const std::string &topic_name, MessageRef serialized_msg, double &timestamp)
Definition: parser_configuration.cpp:102
args
args
DataStreamROS::_running
bool _running
Definition: datastream_ROS.h:53
shape_shifter.hpp
CreateParserROS
std::shared_ptr< PJ::MessageParser > CreateParserROS(const PJ::ParserFactories &factories, const std::string &topic_name, const std::string &type_name, const std::string &definition, PJ::PlotDataMapRef &data)
Definition: ros1_parser.h:7
DataStreamROS::DataStreamROS
DataStreamROS()
Definition: datastream_ROS.cpp:29
DataStreamROS::_action_saveAny
QAction * _action_saveAny
Definition: datastream_ROS.h:70
ros::SubscribeOptions
ros::Time
DialogSelectRosTopics::getResult
PJ::RosParserConfig getResult() const
Definition: dialog_select_ros_topics.cpp:154
PJ::DataStreamer::clearBuffers
void clearBuffers()
PJ::DataStreamer::dataReceived
void dataReceived()
PJ::MessageRef
DataStreamROS::_parser
PJ::CompositeParser _parser
Definition: datastream_ROS.h:78
datatype
const char * datatype()
RosIntrospection::ShapeShifter
DataStreamROS::_node
ros::NodeHandlePtr _node
Definition: datastream_ROS.h:62
rosbag
PJ::DataStreamer::mutex
std::mutex & mutex()
shape_shifter_factory.hpp
PJ::PlotDataMapRef::user_defined
AnySeriesMap user_defined
DataStreamROS::_prefix
std::string _prefix
Definition: datastream_ROS.h:60
ros::WallDuration
ros::master::TopicInfo
DataStreamROS::availableActions
virtual const std::vector< QAction * > & availableActions() override
Definition: datastream_ROS.cpp:455
PJ::RosParserConfig::topics
QStringList topics
Definition: parser_configuration.h:16
PJ::PlotDataAny
TimeseriesBase< std::any > PlotDataAny
parse_event_t::key
@ key
DataStreamROS::_msg_index
std::map< std::string, int > _msg_index
Definition: datastream_ROS.h:74
DataStreamROS::topicCallback
void topicCallback(const RosIntrospection::ShapeShifter::ConstPtr &msg, const std::string &topic_name)
Definition: datastream_ROS.cpp:63
ros::CallbackQueue::callAvailable
void callAvailable()
buffer::size
auto size() const FMT_NOEXCEPT -> size_t
DataStreamROS::_periodic_timer
QTimer * _periodic_timer
Definition: datastream_ROS.h:80
ret
ret
RosManager::getNode
static ros::NodeHandlePtr getNode()
Definition: qnodedialog.cpp:135
nullptr
#define nullptr
ros::getGlobalCallbackQueue
ROSCPP_DECL CallbackQueue * getGlobalCallbackQueue()
RosIntrospectionFactory::getShapeShifter
static RosIntrospection::ShapeShifter * getShapeShifter(const std::string &topic_name)
Definition: shape_shifter_factory.hpp:60
ros::Time::now
static Time now()
PJ::CompositeParser::addParser
void addParser(const std::string &topic_name, std::shared_ptr< PJ::MessageParser > parser)
Definition: parser_configuration.cpp:78


plotjuggler_ros
Author(s): Davide Faconti
autogenerated on Wed Feb 21 2024 03:22:55