datastream_ROS.cpp
Go to the documentation of this file.
00001 #include "datastream_ROS.h"
00002 #include <QTextStream>
00003 #include <QFile>
00004 #include <QMessageBox>
00005 #include <QDebug>
00006 #include <thread>
00007 #include <mutex>
00008 #include <chrono>
00009 #include <thread>
00010 #include <QProgressDialog>
00011 #include <QtGlobal>
00012 #include <QApplication>
00013 #include <QProcess>
00014 #include <QCheckBox>
00015 #include <QSettings>
00016 #include <QFileDialog>
00017 #include <ros/callback_queue.h>
00018 #include <rosbag/bag.h>
00019 #include <topic_tools/shape_shifter.h>
00020 #include <ros/transport_hints.h>
00021 
00022 #include "../dialog_select_ros_topics.h"
00023 #include "../rule_editing.h"
00024 #include "../qnodedialog.h"
00025 #include "../shape_shifter_factory.hpp"
00026 
00027 DataStreamROS::DataStreamROS():
00028     DataStreamer(),
00029     _node(nullptr),
00030     _destination_data(nullptr),
00031     _action_saveIntoRosbag(nullptr),
00032     _prev_clock_time(0)
00033 {
00034     _running = false;
00035     _periodic_timer = new QTimer();
00036     connect( _periodic_timer, &QTimer::timeout,
00037              this, &DataStreamROS::timerCallback);
00038 
00039     loadDefaultSettings();
00040 }
00041 
00042 void DataStreamROS::topicCallback(const topic_tools::ShapeShifter::ConstPtr& msg,
00043                                   const std::string &topic_name)
00044 {
00045     if( !_running ){
00046         return;
00047     }
00048 
00049     using namespace RosIntrospection;
00050     const auto&  md5sum     =  msg->getMD5Sum();
00051     const auto&  datatype   =  msg->getDataType();
00052     const auto&  definition =  msg->getMessageDefinition() ;
00053 
00054     // register the message type
00055     _ros_parser.registerSchema( topic_name, md5sum,
00056                                 RosIntrospection::ROSType(datatype),
00057                                 definition);
00058 
00059     RosIntrospectionFactory::registerMessage(topic_name, md5sum, datatype, definition );
00060 
00061     //------------------------------------
00062 
00063     // it is more efficient to recycle this elements
00064     static std::vector<uint8_t> buffer;
00065     
00066     buffer.resize( msg->size() );
00067     
00068     ros::serialization::OStream stream(buffer.data(), buffer.size());
00069     msg->write(stream);
00070 
00071     double msg_time = ros::Time::now().toSec();
00072 
00073     if( msg_time < _prev_clock_time )
00074     {
00075         // clean
00076         for (auto& it: dataMap().numeric ) {
00077             it.second.clear();
00078             auto dst = _destination_data->numeric.find(it.first);
00079             if( dst != _destination_data->numeric.end()){
00080                 dst->second.clear();
00081             }
00082         }
00083         for (auto& it: dataMap().user_defined ){
00084             it.second.clear();
00085             auto dst = _destination_data->user_defined.find(it.first);
00086             if( dst != _destination_data->user_defined.end()){
00087                 dst->second.clear();
00088             }
00089         }
00090     }
00091     _prev_clock_time = msg_time;
00092 
00093     MessageRef buffer_view( buffer );
00094     _ros_parser.pushMessageRef( topic_name, buffer_view, msg_time );
00095 
00096     std::lock_guard<std::mutex> lock( mutex() );
00097     const std::string prefixed_topic_name = _prefix + topic_name;
00098 
00099     // adding raw serialized msg for future uses.
00100     // do this before msg_time normalization
00101     {
00102         auto plot_pair = dataMap().user_defined.find( prefixed_topic_name );
00103         if( plot_pair == dataMap().user_defined.end() )
00104         {
00105             plot_pair = dataMap().addUserDefined( prefixed_topic_name );
00106         }
00107         PlotDataAny& user_defined_data = plot_pair->second;
00108         user_defined_data.pushBack( PlotDataAny::Point(msg_time, nonstd::any(std::move(buffer)) ));
00109     }
00110 
00111     _ros_parser.extractData(dataMap(), _prefix);
00112 
00113     //------------------------------
00114     {
00115         int& index = _msg_index[topic_name];
00116         index++;
00117         const std::string key = prefixed_topic_name + ("/_MSG_INDEX_") ;
00118         auto index_it = dataMap().numeric.find(key);
00119         if( index_it == dataMap().numeric.end())
00120         {
00121             index_it = dataMap().addNumeric( key );
00122         }
00123         index_it->second.pushBack( PlotData::Point(msg_time, index) );
00124     }
00125 }
00126 
00127 void DataStreamROS::extractInitialSamples()
00128 {
00129     using namespace std::chrono;
00130     milliseconds wait_time_ms(1000);
00131 
00132     QProgressDialog progress_dialog;
00133     progress_dialog.setLabelText( "Collecting ROS topic samples to understand data layout. ");
00134     progress_dialog.setRange(0, wait_time_ms.count());
00135     progress_dialog.setAutoClose(true);
00136     progress_dialog.setAutoReset(true);
00137 
00138     progress_dialog.show();
00139 
00140     auto start_time = system_clock::now();
00141 
00142     while ( system_clock::now() - start_time < (wait_time_ms) )
00143     {
00144         ros::getGlobalCallbackQueue()->callAvailable(ros::WallDuration(0.1));
00145         int i = duration_cast<milliseconds>(system_clock::now() - start_time).count() ;
00146         progress_dialog.setValue( i );
00147         QApplication::processEvents();
00148         if( progress_dialog.wasCanceled() )
00149         {
00150             break;
00151         }
00152     }
00153 
00154     if( progress_dialog.wasCanceled() == false )
00155     {
00156         progress_dialog.cancel();
00157     }
00158 }
00159 
00160 void DataStreamROS::timerCallback()
00161 {
00162     if( _running && ros::master::check() == false
00163             && !_roscore_disconnection_already_notified)
00164     {
00165         auto ret = QMessageBox::warning(nullptr,
00166                                         tr("Disconnected!"),
00167                                         tr("The roscore master cannot be detected.\n\n"
00168                                            "Do you want to try reconnecting to it? \n\n"
00169                                            "NOTE: if you select CONTINUE, you might need"
00170                                            " to stop and restart this plugin."),
00171                                         tr("Stop Plugin"),
00172                                         tr("Try reconnect"),
00173                                         tr("Continue"),
00174                                         0);
00175         _roscore_disconnection_already_notified = ( ret == 2 );
00176         if( ret == 1 )
00177         {
00178             this->shutdown();
00179             _node =  RosManager::getNode();
00180 
00181             if( !_node ){
00182                 emit connectionClosed();
00183                 return;
00184             }
00185             subscribe();
00186 
00187             _running = true;
00188             _spinner = std::make_shared<ros::AsyncSpinner>(1);
00189             _spinner->start();
00190             _periodic_timer->start();
00191         }
00192         else if( ret == 0)
00193         {
00194             this->shutdown();
00195             emit connectionClosed();
00196         }
00197     }
00198 }
00199 
00200 void DataStreamROS::saveIntoRosbag(const PlotDataMapRef& data)
00201 {
00202 
00203     if( data.user_defined.empty()){
00204         QMessageBox::warning(nullptr, tr("Warning"), tr("Your buffer is empty. Nothing to save.\n") );
00205         return;
00206     }
00207 
00208     QFileDialog saveDialog;
00209     saveDialog.setAcceptMode(QFileDialog::AcceptSave);
00210     saveDialog.setDefaultSuffix("bag");
00211     saveDialog.exec();
00212 
00213     if(saveDialog.result() != QDialog::Accepted || saveDialog.selectedFiles().empty())
00214     {
00215         return;
00216     }
00217 
00218     QString fileName = saveDialog.selectedFiles().first();
00219 
00220     if( fileName.size() > 0)
00221     {
00222         rosbag::Bag rosbag(fileName.toStdString(), rosbag::bagmode::Write );
00223 
00224         for (const auto& it: data.user_defined )
00225         {
00226             const std::string& topicname = it.first;
00227             const auto& plotdata = it.second;
00228 
00229             auto registered_msg_type = RosIntrospectionFactory::get().getShapeShifter(topicname);
00230             if(!registered_msg_type) continue;
00231 
00232             RosIntrospection::ShapeShifter msg;
00233             msg.morph(registered_msg_type->getMD5Sum(),
00234                       registered_msg_type->getDataType(),
00235                       registered_msg_type->getMessageDefinition());
00236 
00237             for (int i=0; i< plotdata.size(); i++)
00238             {
00239                 const auto& point = plotdata.at(i);
00240                 const PlotDataAny::TimeType msg_time  = point.x;
00241                 const nonstd::any& type_erased_buffer = point.y;
00242 
00243                 if(type_erased_buffer.type() != typeid( std::vector<uint8_t> ))
00244                 {
00245                     // can't cast to expected type
00246                     continue;
00247                 }
00248 
00249                 std::vector<uint8_t> raw_buffer =  nonstd::any_cast<std::vector<uint8_t>>( type_erased_buffer );
00250                 ros::serialization::IStream stream( raw_buffer.data(), raw_buffer.size() );
00251                 msg.read( stream );
00252 
00253                 rosbag.write( topicname, ros::Time(msg_time), msg);
00254             }
00255         }
00256         rosbag.close();
00257 
00258         QProcess process;
00259         QStringList args;
00260         args << "reindex" << fileName;
00261         process.start("rosbag" , args);
00262     }
00263 }
00264 
00265 
00266 void DataStreamROS::subscribe()
00267 {
00268     _subscribers.clear();
00269     {
00270         boost::function<void(const rosgraph_msgs::Clock::ConstPtr&) > callback;
00271         callback = [this](const rosgraph_msgs::Clock::ConstPtr& msg) -> void
00272         {
00273             this->clockCallback(msg) ;
00274         };
00275     }
00276 
00277     for (int i=0; i< _config.selected_topics.size(); i++ )
00278     {
00279         const std::string topic_name = _config.selected_topics[i].toStdString();
00280         boost::function<void(const topic_tools::ShapeShifter::ConstPtr&) > callback;
00281         callback = [this, topic_name](const topic_tools::ShapeShifter::ConstPtr& msg) -> void
00282         {
00283             this->topicCallback(msg, topic_name) ;
00284         };
00285 
00286         ros::SubscribeOptions ops;
00287         ops.initByFullCallbackType(topic_name, 1, callback);
00288         ops.transport_hints = ros::TransportHints().tcpNoDelay();
00289 
00290         _subscribers.insert( {topic_name, _node->subscribe(ops) }  );
00291     }
00292 }
00293 
00294 bool DataStreamROS::start(QStringList* selected_datasources)
00295 {
00296     _ros_parser.clear();
00297     if( !_node )
00298     {
00299         _node =  RosManager::getNode();
00300     }
00301 
00302     if( !_node ){
00303         return false;
00304     }
00305 
00306     {
00307         std::lock_guard<std::mutex> lock( mutex() );
00308         dataMap().numeric.clear();
00309         dataMap().user_defined.clear();
00310     }
00311 
00312     using namespace RosIntrospection;
00313 
00314     std::vector<std::pair<QString,QString>> all_topics;
00315     ros::master::V_TopicInfo topic_infos;
00316     ros::master::getTopics(topic_infos);
00317     for (ros::master::TopicInfo topic_info: topic_infos)
00318     {
00319         all_topics.push_back(
00320                     std::make_pair(QString(topic_info.name.c_str()),
00321                                    QString(topic_info.datatype.c_str()) ) );
00322     }
00323 
00324     QTimer timer;
00325     timer.setSingleShot(false);
00326     timer.setInterval( 1000);
00327     timer.start();
00328 
00329     DialogSelectRosTopics dialog( all_topics, _config );
00330 
00331     connect( &timer, &QTimer::timeout, [&]()
00332     {
00333         all_topics.clear();
00334         topic_infos.clear();
00335         ros::master::getTopics(topic_infos);
00336         for (ros::master::TopicInfo topic_info: topic_infos)
00337         {
00338             all_topics.push_back(
00339                         std::make_pair(QString(topic_info.name.c_str()),
00340                                        QString(topic_info.datatype.c_str()) ) );
00341         }
00342         dialog.updateTopicList(all_topics);
00343     });
00344 
00345     int res = dialog.exec();
00346 
00347     _config = dialog.getResult();
00348 
00349     timer.stop();
00350 
00351     if( res != QDialog::Accepted || _config.selected_topics.empty() )
00352     {
00353         return false;
00354     }
00355 
00356     saveDefaultSettings();
00357 
00358     _ros_parser.setUseHeaderStamp( _config.use_header_stamp );
00359 
00360     if( _config.use_renaming_rules )
00361     {
00362         _ros_parser.addRules( RuleEditing::getRenamingRules() );
00363     }
00364 
00365     _ros_parser.setMaxArrayPolicy( _config.max_array_size, _config.discard_large_arrays );
00366 
00367     //-------------------------
00368     subscribe();
00369     _running = true;
00370 
00371     extractInitialSamples();
00372 
00373     _spinner = std::make_shared<ros::AsyncSpinner>(1);
00374     _spinner->start();
00375 
00376     _periodic_timer->setInterval(500);
00377     _roscore_disconnection_already_notified = false;
00378     _periodic_timer->start();
00379 
00380     return true;
00381 }
00382 
00383 bool DataStreamROS::isRunning() const { return _running; }
00384 
00385 void DataStreamROS::shutdown()
00386 {
00387     _periodic_timer->stop();
00388     if(_spinner)
00389     {
00390         _spinner->stop();
00391     }
00392     for(auto& it: _subscribers)
00393     {
00394         it.second.shutdown();
00395     }
00396     _subscribers.clear();
00397     _running = false;
00398     _node.reset();
00399     _spinner.reset();
00400 }
00401 
00402 DataStreamROS::~DataStreamROS()
00403 {
00404     shutdown();
00405 }
00406 
00407 bool DataStreamROS::xmlSaveState(QDomDocument &doc, QDomElement &plugin_elem) const
00408 {
00409     QDomElement stamp_elem = doc.createElement("use_header_stamp");
00410     stamp_elem.setAttribute("value", _config.use_header_stamp ? "true" : "false");
00411     plugin_elem.appendChild( stamp_elem );
00412 
00413     QDomElement rename_elem = doc.createElement("use_renaming_rules");
00414     rename_elem.setAttribute("value", _config.use_renaming_rules ? "true" : "false");
00415     plugin_elem.appendChild( rename_elem );
00416 
00417     QDomElement discard_elem = doc.createElement("discard_large_arrays");
00418     discard_elem.setAttribute("value", _config.discard_large_arrays ? "true" : "false");
00419     plugin_elem.appendChild( discard_elem );
00420 
00421     QDomElement max_elem = doc.createElement("max_array_size");
00422     max_elem.setAttribute("value", QString::number(_config.max_array_size));
00423     plugin_elem.appendChild( max_elem );
00424 
00425     return true;
00426 }
00427 
00428 bool DataStreamROS::xmlLoadState(const QDomElement &parent_element)
00429 {
00430     QDomElement stamp_elem = parent_element.firstChildElement( "use_header_stamp" );
00431     _config.use_header_stamp = ( stamp_elem.attribute("value") == "true");
00432 
00433     QDomElement rename_elem = parent_element.firstChildElement( "use_renaming_rules" );
00434     _config.use_renaming_rules = ( rename_elem.attribute("value") == "true");
00435 
00436     QDomElement discard_elem = parent_element.firstChildElement( "discard_large_arrays" );
00437     _config.discard_large_arrays = ( discard_elem.attribute("value") == "true");
00438 
00439     QDomElement max_elem = parent_element.firstChildElement( "max_array_size" );
00440     _config.max_array_size = max_elem.attribute("value").toInt();
00441 
00442     return true;
00443 }
00444 
00445 void DataStreamROS::addActionsToParentMenu(QMenu *menu)
00446 {
00447     _action_saveIntoRosbag = new QAction(QString("Save cached value in a rosbag"), menu);
00448     QIcon iconSave;
00449     iconSave.addFile(QStringLiteral(":/icons/resources/light/save.png"), QSize(26, 26));
00450     _action_saveIntoRosbag->setIcon(iconSave);
00451     menu->addAction( _action_saveIntoRosbag );
00452 
00453     connect( _action_saveIntoRosbag, &QAction::triggered, this, [this]()
00454     {
00455         DataStreamROS::saveIntoRosbag( *_destination_data );
00456     });
00457 }
00458 
00459 void DataStreamROS::saveDefaultSettings()
00460 {
00461     QSettings settings;
00462 
00463     settings.setValue("DataStreamROS/default_topics", _config.selected_topics);
00464     settings.setValue("DataStreamROS/use_renaming", _config.use_renaming_rules);
00465     settings.setValue("DataStreamROS/use_header_stamp", _config.use_header_stamp);
00466     settings.setValue("DataStreamROS/max_array_size", (int)_config.max_array_size);
00467     settings.setValue("DataStreamROS/discard_large_arrays", _config.discard_large_arrays);
00468 }
00469 
00470 
00471 void DataStreamROS::loadDefaultSettings()
00472 {
00473     QSettings settings;
00474     _config.selected_topics      = settings.value("DataStreamROS/default_topics", false ).toStringList();
00475     _config.use_header_stamp     = settings.value("DataStreamROS/use_header_stamp", false ).toBool();
00476     _config.use_renaming_rules   = settings.value("DataStreamROS/use_renaming", true ).toBool();
00477     _config.max_array_size       = settings.value("DataStreamROS/max_array_size", 100 ).toInt();
00478     _config.discard_large_arrays = settings.value("DataStreamROS/discard_large_arrays", true ).toBool();
00479 }
00480 
00481 


plotjuggler
Author(s): Davide Faconti
autogenerated on Wed Jul 3 2019 19:28:04