datastream_ROS.cpp
Go to the documentation of this file.
00001 #include "datastream_ROS.h"
00002 #include <QTextStream>
00003 #include <QFile>
00004 #include <QMessageBox>
00005 #include <QDebug>
00006 #include <thread>
00007 #include <mutex>
00008 #include <chrono>
00009 #include <thread>
00010 #include <QProgressDialog>
00011 #include <QtGlobal>
00012 #include <QApplication>
00013 #include <QProcess>
00014 #include <QFileDialog>
00015 #include <ros/callback_queue.h>
00016 #include <rosbag/bag.h>
00017 #include <topic_tools/shape_shifter.h>
00018 
00019 #include "../dialog_select_ros_topics.h"
00020 #include "../rule_editing.h"
00021 #include "../qnodedialog.h"
00022 #include "../shape_shifter_factory.hpp"
00023 
00024 DataStreamROS::DataStreamROS():
00025     _action_saveIntoRosbag(nullptr),
00026     _node(nullptr)
00027 {
00028     _enabled = false;
00029     _running = false;
00030     _initial_time = std::numeric_limits<double>::max();
00031     _use_header_timestamp = true;
00032 }
00033 
00034 PlotDataMap& DataStreamROS::getDataMap()
00035 {
00036     return _plot_data;
00037 }
00038 
00039 void DataStreamROS::topicCallback(const topic_tools::ShapeShifter::ConstPtr& msg, const std::string &topic_name)
00040 {
00041     if( !_running ||  !_enabled){
00042         return;
00043     }
00044 
00045     using namespace RosIntrospection;
00046 
00047     const auto&  md5sum     =  msg->getMD5Sum();
00048     const auto&  datatype   =  msg->getDataType();
00049     const auto&  definition =  msg->getMessageDefinition() ;
00050 
00051     // register the message type
00052     RosIntrospectionFactory::get().registerMessage(topic_name, md5sum, datatype, definition);
00053 
00054     const RosIntrospection::ROSTypeList* type_map = RosIntrospectionFactory::get().getRosTypeList(topic_name);
00055     if( !type_map )
00056     {
00057         throw std::runtime_error("Can't retrieve the ROSTypeList from RosIntrospectionFactory");
00058     }
00059 
00060     //------------------------------------
00061 
00062     // it is more efficient to recycle this elements
00063     static std::vector<uint8_t> buffer;
00064     static ROSTypeFlat flat_container;
00065     static RenamedValues renamed_value;
00066     
00067     buffer.resize( msg->size() );
00068     
00069     ros::serialization::OStream stream(buffer.data(), buffer.size());
00070     msg->write(stream);
00071 
00072     SString topicname_SS( topic_name.data(), topic_name.length() );
00073     // WORKAROUND. There are some problems related to renaming when the character / is
00074     // used as prefix. We will remove that here.
00075     if( topicname_SS.at(0) == '/' ) topicname_SS = SString( topic_name.data() +1,  topic_name.size()-1 );
00076 
00077     buildRosFlatType( *type_map, datatype, topicname_SS,
00078                       buffer.data(), &flat_container, 250);
00079 
00080     applyNameTransform( _rules[datatype], flat_container, renamed_value );
00081 
00082     SString header_stamp_field( topic_name );
00083     header_stamp_field.append(".header.stamp");
00084 
00085     double msg_time = 0;
00086 
00087     // detrmine the time offset
00088     if(_use_header_timestamp == false)
00089     {
00090         msg_time = ros::Time::now().toSec();
00091     }
00092     else{
00093         auto offset = FlatContainedContainHeaderStamp(renamed_value);
00094         if(offset){
00095             msg_time = offset.value();
00096         }
00097         else{
00098             msg_time = ros::Time::now().toSec();
00099         }
00100     }
00101 
00102     // adding raw serialized msg for future uses.
00103     // do this before msg_time normalization
00104     {
00105         auto plot_pair = _plot_data.user_defined.find( md5sum );
00106         if( plot_pair == _plot_data.user_defined.end() )
00107         {
00108             PlotDataAnyPtr temp(new PlotDataAny(topic_name.c_str()));
00109             auto res = _plot_data.user_defined.insert( std::make_pair( topic_name, temp ) );
00110             plot_pair = res.first;
00111         }
00112         PlotDataAnyPtr& user_defined_data = plot_pair->second;
00113         user_defined_data->pushBack( PlotDataAny::Point(msg_time, nonstd::any(std::move(buffer)) ));
00114     }
00115 
00116     PlotData::asyncPushMutex().lock();
00117     for(auto& it: renamed_value )
00118     {
00119         const std::string& field_name ( it.first.data() );
00120         const RosIntrospection::VarNumber& var_value = it.second;
00121         const double value = var_value.convert<double>();
00122         auto plot_it = _plot_data.numeric.find(field_name);
00123         if( plot_it == _plot_data.numeric.end())
00124         {
00125             auto res =   _plot_data.numeric.insert(
00126                         std::make_pair( field_name, std::make_shared<PlotData>(field_name.c_str()) ));
00127             plot_it = res.first;
00128         }
00129         plot_it->second->pushBackAsynchronously( PlotData::Point(msg_time, value));
00130     }
00131 
00132     //------------------------------
00133     {
00134         int& index = _msg_index[topic_name];
00135         index++;
00136         const std::string index_name = topic_name + ("/_MSG_INDEX_") ;
00137         auto index_it = _plot_data.numeric.find(index_name);
00138         if( index_it == _plot_data.numeric.end())
00139         {
00140             auto res = _plot_data.numeric.insert(
00141                         std::make_pair( index_name, std::make_shared<PlotData>(index_name.c_str()) ));
00142             index_it = res.first;
00143         }
00144         index_it->second->pushBackAsynchronously( PlotData::Point(msg_time, index) );
00145     }
00146     PlotData::asyncPushMutex().unlock();
00147 }
00148 
00149 void DataStreamROS::extractInitialSamples()
00150 {
00151     using namespace std::chrono;
00152     milliseconds wait_time_ms(1000);
00153 
00154     QProgressDialog progress_dialog;
00155     progress_dialog.setLabelText( "Collecting ROS topic samples to understand data layout. ");
00156     progress_dialog.setRange(0, wait_time_ms.count());
00157     progress_dialog.setAutoClose(true);
00158     progress_dialog.setAutoReset(true);
00159 
00160     progress_dialog.show();
00161 
00162     auto start_time = system_clock::now();
00163 
00164     _enabled = true;
00165     while ( system_clock::now() - start_time < (wait_time_ms) )
00166     {
00167         ros::getGlobalCallbackQueue()->callAvailable(ros::WallDuration(0.1));
00168         int i = duration_cast<milliseconds>(system_clock::now() - start_time).count() ;
00169         progress_dialog.setValue( i );
00170         QApplication::processEvents();
00171         if( progress_dialog.wasCanceled() )
00172         {
00173             break;
00174         }
00175     }
00176     _enabled = false;
00177 
00178     if( progress_dialog.wasCanceled() == false )
00179     {
00180         progress_dialog.cancel();
00181     }
00182 }
00183 
00184 void DataStreamROS::saveIntoRosbag()
00185 {
00186     if( _plot_data.user_defined.empty()){
00187         QMessageBox::warning(0, tr("Warning"), tr("Your buffer is empty. Nothing to save.\n") );
00188         return;
00189     }
00190 
00191     QFileDialog saveDialog;
00192     saveDialog.setAcceptMode(QFileDialog::AcceptSave);
00193     saveDialog.setDefaultSuffix("bag");
00194     saveDialog.exec();
00195 
00196     if(saveDialog.result() != QDialog::Accepted || saveDialog.selectedFiles().empty())
00197     {
00198         return;
00199     }
00200 
00201     QString fileName = saveDialog.selectedFiles().first();
00202 
00203     if( fileName.size() > 0)
00204     {
00205         rosbag::Bag rosbag(fileName.toStdString(), rosbag::bagmode::Write );
00206 
00207         for (auto it: _plot_data.user_defined )
00208         {
00209             const std::string& topicname = it.first;
00210             const PlotDataAnyPtr& plotdata = it.second;
00211 
00212             auto registered_msg_type = RosIntrospectionFactory::get().getShapeShifter(topicname);
00213             if(!registered_msg_type) continue;
00214 
00215             RosIntrospection::ShapeShifter msg;
00216             msg.morph(registered_msg_type->getMD5Sum(),
00217                       registered_msg_type->getDataType(),
00218                       registered_msg_type->getMessageDefinition());
00219 
00220             for (int i=0; i< plotdata->size(); i++)
00221             {
00222                 const auto& point = plotdata->at(i);
00223                 const PlotDataAny::TimeType msg_time  = point.x;
00224                 const nonstd::any& type_erased_buffer = point.y;
00225 
00226                 if(type_erased_buffer.type() != typeid( std::vector<uint8_t> ))
00227                 {
00228                   // can't cast to expected type
00229                   continue;
00230                 }
00231 
00232                 std::vector<uint8_t> raw_buffer =  nonstd::any_cast<std::vector<uint8_t>>( type_erased_buffer );
00233                 ros::serialization::IStream stream( raw_buffer.data(), raw_buffer.size() );
00234                 msg.read( stream );
00235 
00236                 rosbag.write( topicname, ros::Time(msg_time), msg);
00237             }
00238         }
00239         rosbag.close();
00240 
00241         QProcess process;
00242         QStringList args;
00243         args << "reindex" << fileName;
00244         process.start("rosbag" ,args);
00245     }
00246 }
00247 
00248 
00249 bool DataStreamROS::start(QString& default_configuration)
00250 {
00251     if( !_node )
00252     {
00253         _node =  RosManager::getNode();
00254     }
00255 
00256     if( !_node ){
00257         return false;
00258     }
00259 
00260     _plot_data.numeric.clear();
00261     _plot_data.user_defined.clear();
00262     _initial_time = std::numeric_limits<double>::max();
00263 
00264     using namespace RosIntrospection;
00265 
00266     std::vector<std::pair<QString,QString>> all_topics;
00267     ros::master::V_TopicInfo topic_infos;
00268     ros::master::getTopics(topic_infos);
00269     for (ros::master::TopicInfo topic_info: topic_infos)
00270     {
00271         all_topics.push_back(
00272                     std::make_pair(QString(topic_info.name.c_str()),
00273                                    QString(topic_info.datatype.c_str()) ) );
00274     }
00275 
00276     QStringList default_topics = default_configuration.split(' ', QString::SkipEmptyParts);
00277 
00278     QTimer timer;
00279     timer.setSingleShot(false);
00280     timer.setInterval( 1000);
00281     timer.start();
00282 
00283     DialogSelectRosTopics dialog(all_topics, default_topics );
00284 
00285     connect( &timer, &QTimer::timeout, [&]()
00286     {
00287         all_topics.clear();
00288         topic_infos.clear();
00289         ros::master::getTopics(topic_infos);
00290         for (ros::master::TopicInfo topic_info: topic_infos)
00291         {
00292             all_topics.push_back(
00293                         std::make_pair(QString(topic_info.name.c_str()),
00294                                        QString(topic_info.datatype.c_str()) ) );
00295         }
00296         dialog.updateTopicList(all_topics);
00297     });
00298 
00299     int res = dialog.exec();
00300 
00301     timer.stop();
00302 
00303     QStringList topic_selected = dialog.getSelectedItems();
00304     std::vector<std::string> std_topic_selected;
00305 
00306     if( res != QDialog::Accepted || topic_selected.empty() )
00307     {
00308         return false;
00309     }
00310 
00311     default_configuration.clear();
00312     for (const QString& topic :topic_selected )
00313     {
00314         default_configuration.append(topic).append(" ");
00315         std_topic_selected.push_back( topic.toStdString() );
00316     }
00317 
00318     // load the rules
00319     if( dialog.checkBoxUseRenamingRules()->isChecked() ){
00320         _rules = RuleEditing::getRenamingRules();
00321     }
00322 
00323     _use_header_timestamp = dialog.checkBoxUseHeaderStamp()->isChecked();
00324     //-------------------------
00325 
00326     _subscribers.clear();
00327 
00328     for (int i=0; i<topic_selected.size(); i++ )
00329     {
00330         boost::function<void(const topic_tools::ShapeShifter::ConstPtr&) > callback;
00331         const std::string& topic_name = std_topic_selected[i];
00332         callback = [this, topic_name](const topic_tools::ShapeShifter::ConstPtr& msg) -> void
00333         {
00334             this->topicCallback(msg, topic_name) ;
00335         };
00336         _subscribers.push_back( _node->subscribe( topic_name, 0,  callback)  );
00337     }
00338 
00339     _running = true;
00340 
00341     extractInitialSamples();
00342 
00343     _thread = std::thread([this](){ this->updateLoop();} );
00344 
00345     return true;
00346 }
00347 
00348 void DataStreamROS::enableStreaming(bool enable) { _enabled = enable; }
00349 
00350 bool DataStreamROS::isStreamingEnabled() const { return _enabled; }
00351 
00352 
00353 void DataStreamROS::shutdown()
00354 {
00355     if( _running ){
00356         _running = false;
00357         _thread.join();
00358     }
00359 
00360     for(ros::Subscriber& sub: _subscribers)
00361     {
00362         sub.shutdown();
00363     }
00364     _subscribers.clear();
00365 }
00366 
00367 DataStreamROS::~DataStreamROS()
00368 {
00369     shutdown();
00370 }
00371 
00372 void DataStreamROS::setParentMenu(QMenu *menu)
00373 {
00374     _menu = menu;
00375 
00376     _action_saveIntoRosbag = new QAction(QString("Save cached value in a rosbag"), _menu);
00377     QIcon iconSave;
00378     iconSave.addFile(QStringLiteral(":/icons/resources/filesave@2x.png"), QSize(26, 26));
00379     _action_saveIntoRosbag->setIcon(iconSave);
00380 
00381     _menu->addAction( _action_saveIntoRosbag );
00382 
00383     connect( _action_saveIntoRosbag, &QAction::triggered, this, &DataStreamROS::saveIntoRosbag );
00384 }
00385 
00386 
00387 void DataStreamROS::updateLoop()
00388 {
00389     while (ros::ok() && _running)
00390     {
00391         ros::spinOnce();
00392     }
00393 }


plotjuggler
Author(s): Davide Faconti
autogenerated on Fri Sep 1 2017 02:41:56