00001 #include "datastream_ROS.h"
00002 #include <QTextStream>
00003 #include <QFile>
00004 #include <QMessageBox>
00005 #include <QDebug>
00006 #include <thread>
00007 #include <mutex>
00008 #include <chrono>
00009 #include <thread>
00010 #include <QProgressDialog>
00011 #include <QtGlobal>
00012 #include <QApplication>
00013 #include <QProcess>
00014 #include <QCheckBox>
00015 #include <QSettings>
00016 #include <QFileDialog>
00017 #include <ros/callback_queue.h>
00018 #include <rosbag/bag.h>
00019 #include <topic_tools/shape_shifter.h>
00020 #include <ros/transport_hints.h>
00021
00022 #include "../dialog_select_ros_topics.h"
00023 #include "../rule_editing.h"
00024 #include "../qnodedialog.h"
00025 #include "../shape_shifter_factory.hpp"
00026
00027 DataStreamROS::DataStreamROS():
00028 DataStreamer(),
00029 _node(nullptr),
00030 _destination_data(nullptr),
00031 _action_saveIntoRosbag(nullptr),
00032 _prev_clock_time(0)
00033 {
00034 _running = false;
00035 _periodic_timer = new QTimer();
00036 connect( _periodic_timer, &QTimer::timeout,
00037 this, &DataStreamROS::timerCallback);
00038
00039 loadDefaultSettings();
00040 }
00041
00042 void DataStreamROS::topicCallback(const topic_tools::ShapeShifter::ConstPtr& msg,
00043 const std::string &topic_name)
00044 {
00045 if( !_running ){
00046 return;
00047 }
00048
00049 using namespace RosIntrospection;
00050 const auto& md5sum = msg->getMD5Sum();
00051 const auto& datatype = msg->getDataType();
00052 const auto& definition = msg->getMessageDefinition() ;
00053
00054
00055 _ros_parser.registerSchema( topic_name, md5sum,
00056 RosIntrospection::ROSType(datatype),
00057 definition);
00058
00059 RosIntrospectionFactory::registerMessage(topic_name, md5sum, datatype, definition );
00060
00061
00062
00063
00064 static std::vector<uint8_t> buffer;
00065
00066 buffer.resize( msg->size() );
00067
00068 ros::serialization::OStream stream(buffer.data(), buffer.size());
00069 msg->write(stream);
00070
00071 double msg_time = ros::Time::now().toSec();
00072
00073 if( msg_time < _prev_clock_time )
00074 {
00075
00076 for (auto& it: dataMap().numeric ) {
00077 it.second.clear();
00078 auto dst = _destination_data->numeric.find(it.first);
00079 if( dst != _destination_data->numeric.end()){
00080 dst->second.clear();
00081 }
00082 }
00083 for (auto& it: dataMap().user_defined ){
00084 it.second.clear();
00085 auto dst = _destination_data->user_defined.find(it.first);
00086 if( dst != _destination_data->user_defined.end()){
00087 dst->second.clear();
00088 }
00089 }
00090 }
00091 _prev_clock_time = msg_time;
00092
00093 MessageRef buffer_view( buffer );
00094 _ros_parser.pushMessageRef( topic_name, buffer_view, msg_time );
00095
00096 std::lock_guard<std::mutex> lock( mutex() );
00097 const std::string prefixed_topic_name = _prefix + topic_name;
00098
00099
00100
00101 {
00102 auto plot_pair = dataMap().user_defined.find( prefixed_topic_name );
00103 if( plot_pair == dataMap().user_defined.end() )
00104 {
00105 plot_pair = dataMap().addUserDefined( prefixed_topic_name );
00106 }
00107 PlotDataAny& user_defined_data = plot_pair->second;
00108 user_defined_data.pushBack( PlotDataAny::Point(msg_time, nonstd::any(std::move(buffer)) ));
00109 }
00110
00111 _ros_parser.extractData(dataMap(), _prefix);
00112
00113
00114 {
00115 int& index = _msg_index[topic_name];
00116 index++;
00117 const std::string key = prefixed_topic_name + ("/_MSG_INDEX_") ;
00118 auto index_it = dataMap().numeric.find(key);
00119 if( index_it == dataMap().numeric.end())
00120 {
00121 index_it = dataMap().addNumeric( key );
00122 }
00123 index_it->second.pushBack( PlotData::Point(msg_time, index) );
00124 }
00125 }
00126
00127 void DataStreamROS::extractInitialSamples()
00128 {
00129 using namespace std::chrono;
00130 milliseconds wait_time_ms(1000);
00131
00132 QProgressDialog progress_dialog;
00133 progress_dialog.setLabelText( "Collecting ROS topic samples to understand data layout. ");
00134 progress_dialog.setRange(0, wait_time_ms.count());
00135 progress_dialog.setAutoClose(true);
00136 progress_dialog.setAutoReset(true);
00137
00138 progress_dialog.show();
00139
00140 auto start_time = system_clock::now();
00141
00142 while ( system_clock::now() - start_time < (wait_time_ms) )
00143 {
00144 ros::getGlobalCallbackQueue()->callAvailable(ros::WallDuration(0.1));
00145 int i = duration_cast<milliseconds>(system_clock::now() - start_time).count() ;
00146 progress_dialog.setValue( i );
00147 QApplication::processEvents();
00148 if( progress_dialog.wasCanceled() )
00149 {
00150 break;
00151 }
00152 }
00153
00154 if( progress_dialog.wasCanceled() == false )
00155 {
00156 progress_dialog.cancel();
00157 }
00158 }
00159
00160 void DataStreamROS::timerCallback()
00161 {
00162 if( _running && ros::master::check() == false
00163 && !_roscore_disconnection_already_notified)
00164 {
00165 auto ret = QMessageBox::warning(nullptr,
00166 tr("Disconnected!"),
00167 tr("The roscore master cannot be detected.\n\n"
00168 "Do you want to try reconnecting to it? \n\n"
00169 "NOTE: if you select CONTINUE, you might need"
00170 " to stop and restart this plugin."),
00171 tr("Stop Plugin"),
00172 tr("Try reconnect"),
00173 tr("Continue"),
00174 0);
00175 _roscore_disconnection_already_notified = ( ret == 2 );
00176 if( ret == 1 )
00177 {
00178 this->shutdown();
00179 _node = RosManager::getNode();
00180
00181 if( !_node ){
00182 emit connectionClosed();
00183 return;
00184 }
00185 subscribe();
00186
00187 _running = true;
00188 _spinner = std::make_shared<ros::AsyncSpinner>(1);
00189 _spinner->start();
00190 _periodic_timer->start();
00191 }
00192 else if( ret == 0)
00193 {
00194 this->shutdown();
00195 emit connectionClosed();
00196 }
00197 }
00198 }
00199
00200 void DataStreamROS::saveIntoRosbag(const PlotDataMapRef& data)
00201 {
00202
00203 if( data.user_defined.empty()){
00204 QMessageBox::warning(nullptr, tr("Warning"), tr("Your buffer is empty. Nothing to save.\n") );
00205 return;
00206 }
00207
00208 QFileDialog saveDialog;
00209 saveDialog.setAcceptMode(QFileDialog::AcceptSave);
00210 saveDialog.setDefaultSuffix("bag");
00211 saveDialog.exec();
00212
00213 if(saveDialog.result() != QDialog::Accepted || saveDialog.selectedFiles().empty())
00214 {
00215 return;
00216 }
00217
00218 QString fileName = saveDialog.selectedFiles().first();
00219
00220 if( fileName.size() > 0)
00221 {
00222 rosbag::Bag rosbag(fileName.toStdString(), rosbag::bagmode::Write );
00223
00224 for (const auto& it: data.user_defined )
00225 {
00226 const std::string& topicname = it.first;
00227 const auto& plotdata = it.second;
00228
00229 auto registered_msg_type = RosIntrospectionFactory::get().getShapeShifter(topicname);
00230 if(!registered_msg_type) continue;
00231
00232 RosIntrospection::ShapeShifter msg;
00233 msg.morph(registered_msg_type->getMD5Sum(),
00234 registered_msg_type->getDataType(),
00235 registered_msg_type->getMessageDefinition());
00236
00237 for (int i=0; i< plotdata.size(); i++)
00238 {
00239 const auto& point = plotdata.at(i);
00240 const PlotDataAny::TimeType msg_time = point.x;
00241 const nonstd::any& type_erased_buffer = point.y;
00242
00243 if(type_erased_buffer.type() != typeid( std::vector<uint8_t> ))
00244 {
00245
00246 continue;
00247 }
00248
00249 std::vector<uint8_t> raw_buffer = nonstd::any_cast<std::vector<uint8_t>>( type_erased_buffer );
00250 ros::serialization::IStream stream( raw_buffer.data(), raw_buffer.size() );
00251 msg.read( stream );
00252
00253 rosbag.write( topicname, ros::Time(msg_time), msg);
00254 }
00255 }
00256 rosbag.close();
00257
00258 QProcess process;
00259 QStringList args;
00260 args << "reindex" << fileName;
00261 process.start("rosbag" , args);
00262 }
00263 }
00264
00265
00266 void DataStreamROS::subscribe()
00267 {
00268 _subscribers.clear();
00269 {
00270 boost::function<void(const rosgraph_msgs::Clock::ConstPtr&) > callback;
00271 callback = [this](const rosgraph_msgs::Clock::ConstPtr& msg) -> void
00272 {
00273 this->clockCallback(msg) ;
00274 };
00275 }
00276
00277 for (int i=0; i< _config.selected_topics.size(); i++ )
00278 {
00279 const std::string topic_name = _config.selected_topics[i].toStdString();
00280 boost::function<void(const topic_tools::ShapeShifter::ConstPtr&) > callback;
00281 callback = [this, topic_name](const topic_tools::ShapeShifter::ConstPtr& msg) -> void
00282 {
00283 this->topicCallback(msg, topic_name) ;
00284 };
00285
00286 ros::SubscribeOptions ops;
00287 ops.initByFullCallbackType(topic_name, 1, callback);
00288 ops.transport_hints = ros::TransportHints().tcpNoDelay();
00289
00290 _subscribers.insert( {topic_name, _node->subscribe(ops) } );
00291 }
00292 }
00293
00294 bool DataStreamROS::start(QStringList* selected_datasources)
00295 {
00296 _ros_parser.clear();
00297 if( !_node )
00298 {
00299 _node = RosManager::getNode();
00300 }
00301
00302 if( !_node ){
00303 return false;
00304 }
00305
00306 {
00307 std::lock_guard<std::mutex> lock( mutex() );
00308 dataMap().numeric.clear();
00309 dataMap().user_defined.clear();
00310 }
00311
00312 using namespace RosIntrospection;
00313
00314 std::vector<std::pair<QString,QString>> all_topics;
00315 ros::master::V_TopicInfo topic_infos;
00316 ros::master::getTopics(topic_infos);
00317 for (ros::master::TopicInfo topic_info: topic_infos)
00318 {
00319 all_topics.push_back(
00320 std::make_pair(QString(topic_info.name.c_str()),
00321 QString(topic_info.datatype.c_str()) ) );
00322 }
00323
00324 QTimer timer;
00325 timer.setSingleShot(false);
00326 timer.setInterval( 1000);
00327 timer.start();
00328
00329 DialogSelectRosTopics dialog( all_topics, _config );
00330
00331 connect( &timer, &QTimer::timeout, [&]()
00332 {
00333 all_topics.clear();
00334 topic_infos.clear();
00335 ros::master::getTopics(topic_infos);
00336 for (ros::master::TopicInfo topic_info: topic_infos)
00337 {
00338 all_topics.push_back(
00339 std::make_pair(QString(topic_info.name.c_str()),
00340 QString(topic_info.datatype.c_str()) ) );
00341 }
00342 dialog.updateTopicList(all_topics);
00343 });
00344
00345 int res = dialog.exec();
00346
00347 _config = dialog.getResult();
00348
00349 timer.stop();
00350
00351 if( res != QDialog::Accepted || _config.selected_topics.empty() )
00352 {
00353 return false;
00354 }
00355
00356 saveDefaultSettings();
00357
00358 _ros_parser.setUseHeaderStamp( _config.use_header_stamp );
00359
00360 if( _config.use_renaming_rules )
00361 {
00362 _ros_parser.addRules( RuleEditing::getRenamingRules() );
00363 }
00364
00365 _ros_parser.setMaxArrayPolicy( _config.max_array_size, _config.discard_large_arrays );
00366
00367
00368 subscribe();
00369 _running = true;
00370
00371 extractInitialSamples();
00372
00373 _spinner = std::make_shared<ros::AsyncSpinner>(1);
00374 _spinner->start();
00375
00376 _periodic_timer->setInterval(500);
00377 _roscore_disconnection_already_notified = false;
00378 _periodic_timer->start();
00379
00380 return true;
00381 }
00382
00383 bool DataStreamROS::isRunning() const { return _running; }
00384
00385 void DataStreamROS::shutdown()
00386 {
00387 _periodic_timer->stop();
00388 if(_spinner)
00389 {
00390 _spinner->stop();
00391 }
00392 for(auto& it: _subscribers)
00393 {
00394 it.second.shutdown();
00395 }
00396 _subscribers.clear();
00397 _running = false;
00398 _node.reset();
00399 _spinner.reset();
00400 }
00401
00402 DataStreamROS::~DataStreamROS()
00403 {
00404 shutdown();
00405 }
00406
00407 bool DataStreamROS::xmlSaveState(QDomDocument &doc, QDomElement &plugin_elem) const
00408 {
00409 QDomElement stamp_elem = doc.createElement("use_header_stamp");
00410 stamp_elem.setAttribute("value", _config.use_header_stamp ? "true" : "false");
00411 plugin_elem.appendChild( stamp_elem );
00412
00413 QDomElement rename_elem = doc.createElement("use_renaming_rules");
00414 rename_elem.setAttribute("value", _config.use_renaming_rules ? "true" : "false");
00415 plugin_elem.appendChild( rename_elem );
00416
00417 QDomElement discard_elem = doc.createElement("discard_large_arrays");
00418 discard_elem.setAttribute("value", _config.discard_large_arrays ? "true" : "false");
00419 plugin_elem.appendChild( discard_elem );
00420
00421 QDomElement max_elem = doc.createElement("max_array_size");
00422 max_elem.setAttribute("value", QString::number(_config.max_array_size));
00423 plugin_elem.appendChild( max_elem );
00424
00425 return true;
00426 }
00427
00428 bool DataStreamROS::xmlLoadState(const QDomElement &parent_element)
00429 {
00430 QDomElement stamp_elem = parent_element.firstChildElement( "use_header_stamp" );
00431 _config.use_header_stamp = ( stamp_elem.attribute("value") == "true");
00432
00433 QDomElement rename_elem = parent_element.firstChildElement( "use_renaming_rules" );
00434 _config.use_renaming_rules = ( rename_elem.attribute("value") == "true");
00435
00436 QDomElement discard_elem = parent_element.firstChildElement( "discard_large_arrays" );
00437 _config.discard_large_arrays = ( discard_elem.attribute("value") == "true");
00438
00439 QDomElement max_elem = parent_element.firstChildElement( "max_array_size" );
00440 _config.max_array_size = max_elem.attribute("value").toInt();
00441
00442 return true;
00443 }
00444
00445 void DataStreamROS::addActionsToParentMenu(QMenu *menu)
00446 {
00447 _action_saveIntoRosbag = new QAction(QString("Save cached value in a rosbag"), menu);
00448 QIcon iconSave;
00449 iconSave.addFile(QStringLiteral(":/icons/resources/light/save.png"), QSize(26, 26));
00450 _action_saveIntoRosbag->setIcon(iconSave);
00451 menu->addAction( _action_saveIntoRosbag );
00452
00453 connect( _action_saveIntoRosbag, &QAction::triggered, this, [this]()
00454 {
00455 DataStreamROS::saveIntoRosbag( *_destination_data );
00456 });
00457 }
00458
00459 void DataStreamROS::saveDefaultSettings()
00460 {
00461 QSettings settings;
00462
00463 settings.setValue("DataStreamROS/default_topics", _config.selected_topics);
00464 settings.setValue("DataStreamROS/use_renaming", _config.use_renaming_rules);
00465 settings.setValue("DataStreamROS/use_header_stamp", _config.use_header_stamp);
00466 settings.setValue("DataStreamROS/max_array_size", (int)_config.max_array_size);
00467 settings.setValue("DataStreamROS/discard_large_arrays", _config.discard_large_arrays);
00468 }
00469
00470
00471 void DataStreamROS::loadDefaultSettings()
00472 {
00473 QSettings settings;
00474 _config.selected_topics = settings.value("DataStreamROS/default_topics", false ).toStringList();
00475 _config.use_header_stamp = settings.value("DataStreamROS/use_header_stamp", false ).toBool();
00476 _config.use_renaming_rules = settings.value("DataStreamROS/use_renaming", true ).toBool();
00477 _config.max_array_size = settings.value("DataStreamROS/max_array_size", 100 ).toInt();
00478 _config.discard_large_arrays = settings.value("DataStreamROS/discard_large_arrays", true ).toBool();
00479 }
00480
00481