datastream_ROS.cpp
Go to the documentation of this file.
1 #include "datastream_ROS.h"
2 #include <QTextStream>
3 #include <QFile>
4 #include <QMessageBox>
5 #include <QDebug>
6 #include <thread>
7 #include <mutex>
8 #include <chrono>
9 #include <thread>
10 #include <QProgressDialog>
11 #include <QtGlobal>
12 #include <QApplication>
13 #include <QProcess>
14 #include <QCheckBox>
15 #include <QSettings>
16 #include <QFileDialog>
17 #include <ros/callback_queue.h>
18 #include <rosbag/bag.h>
20 #include <ros/transport_hints.h>
21 
22 #include "../dialog_select_ros_topics.h"
23 #include "../rule_editing.h"
24 #include "../qnodedialog.h"
25 #include "../shape_shifter_factory.hpp"
26 
28  DataStreamer(),
29  _node(nullptr),
30  _destination_data(nullptr),
31  _action_saveIntoRosbag(nullptr),
32  _prev_clock_time(0)
33 {
34  _running = false;
35  _periodic_timer = new QTimer();
36  connect( _periodic_timer, &QTimer::timeout,
38 
40 }
41 
43  const std::string &topic_name)
44 {
45  if( !_running ){
46  return;
47  }
48 
49  using namespace RosIntrospection;
50  const auto& md5sum = msg->getMD5Sum();
51  const auto& datatype = msg->getDataType();
52  const auto& definition = msg->getMessageDefinition() ;
53 
54  // register the message type
55  _ros_parser.registerSchema( topic_name, md5sum,
56  RosIntrospection::ROSType(datatype),
57  definition);
58 
59  RosIntrospectionFactory::registerMessage(topic_name, md5sum, datatype, definition );
60 
61  //------------------------------------
62 
63  // it is more efficient to recycle this elements
64  static std::vector<uint8_t> buffer;
65 
66  buffer.resize( msg->size() );
67 
68  ros::serialization::OStream stream(buffer.data(), buffer.size());
69  msg->write(stream);
70 
71  double msg_time = ros::Time::now().toSec();
72 
73  if( msg_time < _prev_clock_time )
74  {
75  // clean
76  for (auto& it: dataMap().numeric ) {
77  it.second.clear();
78  auto dst = _destination_data->numeric.find(it.first);
79  if( dst != _destination_data->numeric.end()){
80  dst->second.clear();
81  }
82  }
83  for (auto& it: dataMap().user_defined ){
84  it.second.clear();
85  auto dst = _destination_data->user_defined.find(it.first);
86  if( dst != _destination_data->user_defined.end()){
87  dst->second.clear();
88  }
89  }
90  }
91  _prev_clock_time = msg_time;
92 
93  MessageRef buffer_view( buffer );
94  _ros_parser.pushMessageRef( topic_name, buffer_view, msg_time );
95 
96  std::lock_guard<std::mutex> lock( mutex() );
97  const std::string prefixed_topic_name = _prefix + topic_name;
98 
99  // adding raw serialized msg for future uses.
100  // do this before msg_time normalization
101  {
102  auto plot_pair = dataMap().user_defined.find( prefixed_topic_name );
103  if( plot_pair == dataMap().user_defined.end() )
104  {
105  plot_pair = dataMap().addUserDefined( prefixed_topic_name );
106  }
107  PlotDataAny& user_defined_data = plot_pair->second;
108  user_defined_data.pushBack( PlotDataAny::Point(msg_time, nonstd::any(std::move(buffer)) ));
109  }
110 
112 
113  //------------------------------
114  {
115  int& index = _msg_index[topic_name];
116  index++;
117  const std::string key = prefixed_topic_name + ("/_MSG_INDEX_") ;
118  auto index_it = dataMap().numeric.find(key);
119  if( index_it == dataMap().numeric.end())
120  {
121  index_it = dataMap().addNumeric( key );
122  }
123  index_it->second.pushBack( PlotData::Point(msg_time, index) );
124  }
125 }
126 
128 {
129  using namespace std::chrono;
130  milliseconds wait_time_ms(1000);
131 
132  QProgressDialog progress_dialog;
133  progress_dialog.setLabelText( "Collecting ROS topic samples to understand data layout. ");
134  progress_dialog.setRange(0, wait_time_ms.count());
135  progress_dialog.setAutoClose(true);
136  progress_dialog.setAutoReset(true);
137 
138  progress_dialog.show();
139 
140  auto start_time = system_clock::now();
141 
142  while ( system_clock::now() - start_time < (wait_time_ms) )
143  {
145  int i = duration_cast<milliseconds>(system_clock::now() - start_time).count() ;
146  progress_dialog.setValue( i );
147  QApplication::processEvents();
148  if( progress_dialog.wasCanceled() )
149  {
150  break;
151  }
152  }
153 
154  if( progress_dialog.wasCanceled() == false )
155  {
156  progress_dialog.cancel();
157  }
158 }
159 
161 {
162  if( _running && ros::master::check() == false
164  {
165  auto ret = QMessageBox::warning(nullptr,
166  tr("Disconnected!"),
167  tr("The roscore master cannot be detected.\n\n"
168  "Do you want to try reconnecting to it? \n\n"
169  "NOTE: if you select CONTINUE, you might need"
170  " to stop and restart this plugin."),
171  tr("Stop Plugin"),
172  tr("Try reconnect"),
173  tr("Continue"),
174  0);
176  if( ret == 1 )
177  {
178  this->shutdown();
180 
181  if( !_node ){
182  emit connectionClosed();
183  return;
184  }
185  subscribe();
186 
187  _running = true;
188  _spinner = std::make_shared<ros::AsyncSpinner>(1);
189  _spinner->start();
190  _periodic_timer->start();
191  }
192  else if( ret == 0)
193  {
194  this->shutdown();
195  emit connectionClosed();
196  }
197  }
198 }
199 
201 {
202 
203  if( data.user_defined.empty()){
204  QMessageBox::warning(nullptr, tr("Warning"), tr("Your buffer is empty. Nothing to save.\n") );
205  return;
206  }
207 
208  QFileDialog saveDialog;
209  saveDialog.setAcceptMode(QFileDialog::AcceptSave);
210  saveDialog.setDefaultSuffix("bag");
211  saveDialog.exec();
212 
213  if(saveDialog.result() != QDialog::Accepted || saveDialog.selectedFiles().empty())
214  {
215  return;
216  }
217 
218  QString fileName = saveDialog.selectedFiles().first();
219 
220  if( fileName.size() > 0)
221  {
222  rosbag::Bag rosbag(fileName.toStdString(), rosbag::bagmode::Write );
223 
224  for (const auto& it: data.user_defined )
225  {
226  const std::string& topicname = it.first;
227  const auto& plotdata = it.second;
228 
229  auto registered_msg_type = RosIntrospectionFactory::get().getShapeShifter(topicname);
230  if(!registered_msg_type) continue;
231 
233  msg.morph(registered_msg_type->getMD5Sum(),
234  registered_msg_type->getDataType(),
235  registered_msg_type->getMessageDefinition());
236 
237  for (int i=0; i< plotdata.size(); i++)
238  {
239  const auto& point = plotdata.at(i);
240  const PlotDataAny::TimeType msg_time = point.x;
241  const nonstd::any& type_erased_buffer = point.y;
242 
243  if(type_erased_buffer.type() != typeid( std::vector<uint8_t> ))
244  {
245  // can't cast to expected type
246  continue;
247  }
248 
249  std::vector<uint8_t> raw_buffer = nonstd::any_cast<std::vector<uint8_t>>( type_erased_buffer );
250  ros::serialization::IStream stream( raw_buffer.data(), raw_buffer.size() );
251  msg.read( stream );
252 
253  rosbag.write( topicname, ros::Time(msg_time), msg);
254  }
255  }
256  rosbag.close();
257 
258  QProcess process;
259  QStringList args;
260  args << "reindex" << fileName;
261  process.start("rosbag" , args);
262  }
263 }
264 
265 
267 {
268  _subscribers.clear();
269  {
270  boost::function<void(const rosgraph_msgs::Clock::ConstPtr&) > callback;
271  callback = [this](const rosgraph_msgs::Clock::ConstPtr& msg) -> void
272  {
273  this->clockCallback(msg) ;
274  };
275  }
276 
277  for (int i=0; i< _config.selected_topics.size(); i++ )
278  {
279  const std::string topic_name = _config.selected_topics[i].toStdString();
280  boost::function<void(const topic_tools::ShapeShifter::ConstPtr&) > callback;
281  callback = [this, topic_name](const topic_tools::ShapeShifter::ConstPtr& msg) -> void
282  {
283  this->topicCallback(msg, topic_name) ;
284  };
285 
287  ops.initByFullCallbackType(topic_name, 1, callback);
289 
290  _subscribers.insert( {topic_name, _node->subscribe(ops) } );
291  }
292 }
293 
294 bool DataStreamROS::start(QStringList* selected_datasources)
295 {
296  _ros_parser.clear();
297  if( !_node )
298  {
300  }
301 
302  if( !_node ){
303  return false;
304  }
305 
306  {
307  std::lock_guard<std::mutex> lock( mutex() );
308  dataMap().numeric.clear();
309  dataMap().user_defined.clear();
310  }
311 
312  using namespace RosIntrospection;
313 
314  std::vector<std::pair<QString,QString>> all_topics;
315  ros::master::V_TopicInfo topic_infos;
316  ros::master::getTopics(topic_infos);
317  for (ros::master::TopicInfo topic_info: topic_infos)
318  {
319  all_topics.push_back(
320  std::make_pair(QString(topic_info.name.c_str()),
321  QString(topic_info.datatype.c_str()) ) );
322  }
323 
324  QTimer timer;
325  timer.setSingleShot(false);
326  timer.setInterval( 1000);
327  timer.start();
328 
329  DialogSelectRosTopics dialog( all_topics, _config );
330 
331  connect( &timer, &QTimer::timeout, [&]()
332  {
333  all_topics.clear();
334  topic_infos.clear();
335  ros::master::getTopics(topic_infos);
336  for (ros::master::TopicInfo topic_info: topic_infos)
337  {
338  all_topics.push_back(
339  std::make_pair(QString(topic_info.name.c_str()),
340  QString(topic_info.datatype.c_str()) ) );
341  }
342  dialog.updateTopicList(all_topics);
343  });
344 
345  int res = dialog.exec();
346 
347  _config = dialog.getResult();
348 
349  timer.stop();
350 
351  if( res != QDialog::Accepted || _config.selected_topics.empty() )
352  {
353  return false;
354  }
355 
357 
359 
361  {
363  }
364 
366 
367  //-------------------------
368  subscribe();
369  _running = true;
370 
372 
373  _spinner = std::make_shared<ros::AsyncSpinner>(1);
374  _spinner->start();
375 
376  _periodic_timer->setInterval(500);
378  _periodic_timer->start();
379 
380  return true;
381 }
382 
383 bool DataStreamROS::isRunning() const { return _running; }
384 
386 {
387  _periodic_timer->stop();
388  if(_spinner)
389  {
390  _spinner->stop();
391  }
392  for(auto& it: _subscribers)
393  {
394  it.second.shutdown();
395  }
396  _subscribers.clear();
397  _running = false;
398  _node.reset();
399  _spinner.reset();
400 }
401 
403 {
404  shutdown();
405 }
406 
407 bool DataStreamROS::xmlSaveState(QDomDocument &doc, QDomElement &plugin_elem) const
408 {
409  QDomElement stamp_elem = doc.createElement("use_header_stamp");
410  stamp_elem.setAttribute("value", _config.use_header_stamp ? "true" : "false");
411  plugin_elem.appendChild( stamp_elem );
412 
413  QDomElement rename_elem = doc.createElement("use_renaming_rules");
414  rename_elem.setAttribute("value", _config.use_renaming_rules ? "true" : "false");
415  plugin_elem.appendChild( rename_elem );
416 
417  QDomElement discard_elem = doc.createElement("discard_large_arrays");
418  discard_elem.setAttribute("value", _config.discard_large_arrays ? "true" : "false");
419  plugin_elem.appendChild( discard_elem );
420 
421  QDomElement max_elem = doc.createElement("max_array_size");
422  max_elem.setAttribute("value", QString::number(_config.max_array_size));
423  plugin_elem.appendChild( max_elem );
424 
425  return true;
426 }
427 
428 bool DataStreamROS::xmlLoadState(const QDomElement &parent_element)
429 {
430  QDomElement stamp_elem = parent_element.firstChildElement( "use_header_stamp" );
431  _config.use_header_stamp = ( stamp_elem.attribute("value") == "true");
432 
433  QDomElement rename_elem = parent_element.firstChildElement( "use_renaming_rules" );
434  _config.use_renaming_rules = ( rename_elem.attribute("value") == "true");
435 
436  QDomElement discard_elem = parent_element.firstChildElement( "discard_large_arrays" );
437  _config.discard_large_arrays = ( discard_elem.attribute("value") == "true");
438 
439  QDomElement max_elem = parent_element.firstChildElement( "max_array_size" );
440  _config.max_array_size = max_elem.attribute("value").toInt();
441 
442  return true;
443 }
444 
446 {
447  _action_saveIntoRosbag = new QAction(QString("Save cached value in a rosbag"), menu);
448  QIcon iconSave;
449  iconSave.addFile(QStringLiteral(":/icons/resources/light/save.png"), QSize(26, 26));
450  _action_saveIntoRosbag->setIcon(iconSave);
451  menu->addAction( _action_saveIntoRosbag );
452 
453  connect( _action_saveIntoRosbag, &QAction::triggered, this, [this]()
454  {
456  });
457 }
458 
460 {
461  QSettings settings;
462 
463  settings.setValue("DataStreamROS/default_topics", _config.selected_topics);
464  settings.setValue("DataStreamROS/use_renaming", _config.use_renaming_rules);
465  settings.setValue("DataStreamROS/use_header_stamp", _config.use_header_stamp);
466  settings.setValue("DataStreamROS/max_array_size", (int)_config.max_array_size);
467  settings.setValue("DataStreamROS/discard_large_arrays", _config.discard_large_arrays);
468 }
469 
470 
472 {
473  QSettings settings;
474  _config.selected_topics = settings.value("DataStreamROS/default_topics", false ).toStringList();
475  _config.use_header_stamp = settings.value("DataStreamROS/use_header_stamp", false ).toBool();
476  _config.use_renaming_rules = settings.value("DataStreamROS/use_renaming", true ).toBool();
477  _config.max_array_size = settings.value("DataStreamROS/max_array_size", 100 ).toInt();
478  _config.discard_large_arrays = settings.value("DataStreamROS/discard_large_arrays", true ).toBool();
479 }
480 
481 
ValueType any_cast(any const &operand)
Definition: any.hpp:432
void topicCallback(const topic_tools::ShapeShifter::ConstPtr &msg, const std::string &topic_name)
std::unordered_map< std::string, PlotData > numeric
Definition: plotdata.h:144
QTimer * _periodic_timer
PlotDataMapRef & dataMap()
ROSCPP_DECL bool check()
void initByFullCallbackType(const std::string &_topic, uint32_t _queue_size, const boost::function< void(P)> &_callback, const boost::function< boost::shared_ptr< typename ParameterAdapter< P >::Message >(void)> &factory_fn=DefaultMessageCreator< typename ParameterAdapter< P >::Message >())
std::mutex & mutex()
static void registerMessage(const std::string &topic_name, const std::string &md5sum, const std::string &datatype, const std::string &definition)
void connectionClosed()
void updateTopicList(std::vector< std::pair< QString, QString >> topic_list)
void pushBack(Point p)
Definition: plotdata.h:214
ros::NodeHandlePtr _node
void extractInitialSamples()
virtual void addActionsToParentMenu(QMenu *menu) override
ROSCPP_DECL bool getTopics(V_TopicInfo &topics)
void saveDefaultSettings()
static RosIntrospectionFactory & get()
void loadDefaultSettings()
std::unordered_map< std::string, PlotDataAny >::iterator addUserDefined(const std::string &name)
Definition: plotdata.h:156
std::vector< TopicInfo > V_TopicInfo
std::string _prefix
virtual void shutdown() override
std::unordered_map< std::string, PlotDataAny > user_defined
Definition: plotdata.h:145
void morph(const std::string &md5sum, const std::string &datatype_, const std::string &msg_def_)
ROSCPP_DECL CallbackQueue * getGlobalCallbackQueue()
TransportHints & tcpNoDelay(bool nodelay=true)
PlotDataMapRef * _destination_data
void read(Stream &stream)
virtual bool start(QStringList *selected_datasources) override
std::map< std::string, int > _msg_index
bool _roscore_disconnection_already_notified
void setMaxArrayPolicy(size_t max_array_size, bool discard_entire_array)
Definition: ros_parser.cpp:58
void pushMessageRef(const std::string &topic_name, const MessageRef &msg, double timestamp) override
Definition: ros_parser.cpp:104
virtual bool xmlLoadState(const QDomElement &parent_element) override
static RosIntrospection::SubstitutionRuleMap getRenamingRules()
virtual void extractData(PlotDataMapRef &destination, const std::string &prefix) override
Definition: ros_parser.cpp:240
Configuration getResult() const
static ros::NodeHandlePtr getNode()
TransportHints transport_hints
const char * msg
virtual ~DataStreamROS() override
double _prev_clock_time
RosMessageParser _ros_parser
virtual bool xmlSaveState(QDomDocument &doc, QDomElement &parent_element) const override
DialogSelectRosTopics::Configuration _config
std::unordered_map< std::string, PlotData >::iterator addNumeric(const std::string &name)
Definition: plotdata.h:147
static RosIntrospection::ShapeShifter * getShapeShifter(const std::string &topic_name)
static Time now()
virtual bool isRunning() const override
int i
The DataStreamer base class to create your own plugin.
const std::type_info & type() const any_noexcept
Definition: any.hpp:342
void clockCallback(const rosgraph_msgs::Clock::ConstPtr &msg)
std::map< std::string, ros::Subscriber > _subscribers
void setUseHeaderStamp(bool use)
static void saveIntoRosbag(const PlotDataMapRef &data)
void addRules(const RosIntrospection::SubstitutionRuleMap &rules)
Definition: ros_parser.h:17
bool registerSchema(const std::string &topic_name, const std::string &md5sum, RosIntrospection::ROSType type, const std::string &definition)
Definition: ros_parser.cpp:83
std::shared_ptr< ros::AsyncSpinner > _spinner
QAction * _action_saveIntoRosbag


plotjuggler
Author(s): Davide Faconti
autogenerated on Sat Jul 6 2019 03:44:17