00001 #include "datastream_ROS.h"
00002 #include <QTextStream>
00003 #include <QFile>
00004 #include <QMessageBox>
00005 #include <QDebug>
00006 #include <thread>
00007 #include <mutex>
00008 #include <chrono>
00009 #include <thread>
00010 #include <QProgressDialog>
00011 #include <QtGlobal>
00012 #include <QApplication>
00013 #include <QProcess>
00014 #include <QFileDialog>
00015 #include <ros/callback_queue.h>
00016 #include <rosbag/bag.h>
00017 #include <topic_tools/shape_shifter.h>
00018
00019 #include "../dialog_select_ros_topics.h"
00020 #include "../rule_editing.h"
00021 #include "../qnodedialog.h"
00022 #include "../shape_shifter_factory.hpp"
00023
00024 DataStreamROS::DataStreamROS():
00025 _action_saveIntoRosbag(nullptr),
00026 _node(nullptr)
00027 {
00028 _enabled = false;
00029 _running = false;
00030 _initial_time = std::numeric_limits<double>::max();
00031 _use_header_timestamp = true;
00032 }
00033
00034 PlotDataMap& DataStreamROS::getDataMap()
00035 {
00036 return _plot_data;
00037 }
00038
00039 void DataStreamROS::topicCallback(const topic_tools::ShapeShifter::ConstPtr& msg, const std::string &topic_name)
00040 {
00041 if( !_running || !_enabled){
00042 return;
00043 }
00044
00045 using namespace RosIntrospection;
00046
00047 const auto& md5sum = msg->getMD5Sum();
00048 const auto& datatype = msg->getDataType();
00049 const auto& definition = msg->getMessageDefinition() ;
00050
00051
00052 RosIntrospectionFactory::get().registerMessage(topic_name, md5sum, datatype, definition);
00053
00054 const RosIntrospection::ROSTypeList* type_map = RosIntrospectionFactory::get().getRosTypeList(topic_name);
00055 if( !type_map )
00056 {
00057 throw std::runtime_error("Can't retrieve the ROSTypeList from RosIntrospectionFactory");
00058 }
00059
00060
00061
00062
00063 static std::vector<uint8_t> buffer;
00064 static ROSTypeFlat flat_container;
00065 static RenamedValues renamed_value;
00066
00067 buffer.resize( msg->size() );
00068
00069 ros::serialization::OStream stream(buffer.data(), buffer.size());
00070 msg->write(stream);
00071
00072 SString topicname_SS( topic_name.data(), topic_name.length() );
00073
00074
00075 if( topicname_SS.at(0) == '/' ) topicname_SS = SString( topic_name.data() +1, topic_name.size()-1 );
00076
00077 buildRosFlatType( *type_map, datatype, topicname_SS,
00078 buffer.data(), &flat_container, 250);
00079
00080 applyNameTransform( _rules[datatype], flat_container, renamed_value );
00081
00082 SString header_stamp_field( topic_name );
00083 header_stamp_field.append(".header.stamp");
00084
00085 double msg_time = 0;
00086
00087
00088 if(_use_header_timestamp == false)
00089 {
00090 msg_time = ros::Time::now().toSec();
00091 }
00092 else{
00093 auto offset = FlatContainedContainHeaderStamp(renamed_value);
00094 if(offset){
00095 msg_time = offset.value();
00096 }
00097 else{
00098 msg_time = ros::Time::now().toSec();
00099 }
00100 }
00101
00102
00103
00104 {
00105 auto plot_pair = _plot_data.user_defined.find( md5sum );
00106 if( plot_pair == _plot_data.user_defined.end() )
00107 {
00108 PlotDataAnyPtr temp(new PlotDataAny(topic_name.c_str()));
00109 auto res = _plot_data.user_defined.insert( std::make_pair( topic_name, temp ) );
00110 plot_pair = res.first;
00111 }
00112 PlotDataAnyPtr& user_defined_data = plot_pair->second;
00113 user_defined_data->pushBack( PlotDataAny::Point(msg_time, nonstd::any(std::move(buffer)) ));
00114 }
00115
00116 PlotData::asyncPushMutex().lock();
00117 for(auto& it: renamed_value )
00118 {
00119 const std::string& field_name ( it.first.data() );
00120 const RosIntrospection::VarNumber& var_value = it.second;
00121 const double value = var_value.convert<double>();
00122 auto plot_it = _plot_data.numeric.find(field_name);
00123 if( plot_it == _plot_data.numeric.end())
00124 {
00125 auto res = _plot_data.numeric.insert(
00126 std::make_pair( field_name, std::make_shared<PlotData>(field_name.c_str()) ));
00127 plot_it = res.first;
00128 }
00129 plot_it->second->pushBackAsynchronously( PlotData::Point(msg_time, value));
00130 }
00131
00132
00133 {
00134 int& index = _msg_index[topic_name];
00135 index++;
00136 const std::string index_name = topic_name + ("/_MSG_INDEX_") ;
00137 auto index_it = _plot_data.numeric.find(index_name);
00138 if( index_it == _plot_data.numeric.end())
00139 {
00140 auto res = _plot_data.numeric.insert(
00141 std::make_pair( index_name, std::make_shared<PlotData>(index_name.c_str()) ));
00142 index_it = res.first;
00143 }
00144 index_it->second->pushBackAsynchronously( PlotData::Point(msg_time, index) );
00145 }
00146 PlotData::asyncPushMutex().unlock();
00147 }
00148
00149 void DataStreamROS::extractInitialSamples()
00150 {
00151 using namespace std::chrono;
00152 milliseconds wait_time_ms(1000);
00153
00154 QProgressDialog progress_dialog;
00155 progress_dialog.setLabelText( "Collecting ROS topic samples to understand data layout. ");
00156 progress_dialog.setRange(0, wait_time_ms.count());
00157 progress_dialog.setAutoClose(true);
00158 progress_dialog.setAutoReset(true);
00159
00160 progress_dialog.show();
00161
00162 auto start_time = system_clock::now();
00163
00164 _enabled = true;
00165 while ( system_clock::now() - start_time < (wait_time_ms) )
00166 {
00167 ros::getGlobalCallbackQueue()->callAvailable(ros::WallDuration(0.1));
00168 int i = duration_cast<milliseconds>(system_clock::now() - start_time).count() ;
00169 progress_dialog.setValue( i );
00170 QApplication::processEvents();
00171 if( progress_dialog.wasCanceled() )
00172 {
00173 break;
00174 }
00175 }
00176 _enabled = false;
00177
00178 if( progress_dialog.wasCanceled() == false )
00179 {
00180 progress_dialog.cancel();
00181 }
00182 }
00183
00184 void DataStreamROS::saveIntoRosbag()
00185 {
00186 if( _plot_data.user_defined.empty()){
00187 QMessageBox::warning(0, tr("Warning"), tr("Your buffer is empty. Nothing to save.\n") );
00188 return;
00189 }
00190
00191 QFileDialog saveDialog;
00192 saveDialog.setAcceptMode(QFileDialog::AcceptSave);
00193 saveDialog.setDefaultSuffix("bag");
00194 saveDialog.exec();
00195
00196 if(saveDialog.result() != QDialog::Accepted || saveDialog.selectedFiles().empty())
00197 {
00198 return;
00199 }
00200
00201 QString fileName = saveDialog.selectedFiles().first();
00202
00203 if( fileName.size() > 0)
00204 {
00205 rosbag::Bag rosbag(fileName.toStdString(), rosbag::bagmode::Write );
00206
00207 for (auto it: _plot_data.user_defined )
00208 {
00209 const std::string& topicname = it.first;
00210 const PlotDataAnyPtr& plotdata = it.second;
00211
00212 auto registered_msg_type = RosIntrospectionFactory::get().getShapeShifter(topicname);
00213 if(!registered_msg_type) continue;
00214
00215 RosIntrospection::ShapeShifter msg;
00216 msg.morph(registered_msg_type->getMD5Sum(),
00217 registered_msg_type->getDataType(),
00218 registered_msg_type->getMessageDefinition());
00219
00220 for (int i=0; i< plotdata->size(); i++)
00221 {
00222 const auto& point = plotdata->at(i);
00223 const PlotDataAny::TimeType msg_time = point.x;
00224 const nonstd::any& type_erased_buffer = point.y;
00225
00226 if(type_erased_buffer.type() != typeid( std::vector<uint8_t> ))
00227 {
00228
00229 continue;
00230 }
00231
00232 std::vector<uint8_t> raw_buffer = nonstd::any_cast<std::vector<uint8_t>>( type_erased_buffer );
00233 ros::serialization::IStream stream( raw_buffer.data(), raw_buffer.size() );
00234 msg.read( stream );
00235
00236 rosbag.write( topicname, ros::Time(msg_time), msg);
00237 }
00238 }
00239 rosbag.close();
00240
00241 QProcess process;
00242 QStringList args;
00243 args << "reindex" << fileName;
00244 process.start("rosbag" ,args);
00245 }
00246 }
00247
00248
00249 bool DataStreamROS::start(QString& default_configuration)
00250 {
00251 if( !_node )
00252 {
00253 _node = RosManager::getNode();
00254 }
00255
00256 if( !_node ){
00257 return false;
00258 }
00259
00260 _plot_data.numeric.clear();
00261 _plot_data.user_defined.clear();
00262 _initial_time = std::numeric_limits<double>::max();
00263
00264 using namespace RosIntrospection;
00265
00266 std::vector<std::pair<QString,QString>> all_topics;
00267 ros::master::V_TopicInfo topic_infos;
00268 ros::master::getTopics(topic_infos);
00269 for (ros::master::TopicInfo topic_info: topic_infos)
00270 {
00271 all_topics.push_back(
00272 std::make_pair(QString(topic_info.name.c_str()),
00273 QString(topic_info.datatype.c_str()) ) );
00274 }
00275
00276 QStringList default_topics = default_configuration.split(' ', QString::SkipEmptyParts);
00277
00278 QTimer timer;
00279 timer.setSingleShot(false);
00280 timer.setInterval( 1000);
00281 timer.start();
00282
00283 DialogSelectRosTopics dialog(all_topics, default_topics );
00284
00285 connect( &timer, &QTimer::timeout, [&]()
00286 {
00287 all_topics.clear();
00288 topic_infos.clear();
00289 ros::master::getTopics(topic_infos);
00290 for (ros::master::TopicInfo topic_info: topic_infos)
00291 {
00292 all_topics.push_back(
00293 std::make_pair(QString(topic_info.name.c_str()),
00294 QString(topic_info.datatype.c_str()) ) );
00295 }
00296 dialog.updateTopicList(all_topics);
00297 });
00298
00299 int res = dialog.exec();
00300
00301 timer.stop();
00302
00303 QStringList topic_selected = dialog.getSelectedItems();
00304 std::vector<std::string> std_topic_selected;
00305
00306 if( res != QDialog::Accepted || topic_selected.empty() )
00307 {
00308 return false;
00309 }
00310
00311 default_configuration.clear();
00312 for (const QString& topic :topic_selected )
00313 {
00314 default_configuration.append(topic).append(" ");
00315 std_topic_selected.push_back( topic.toStdString() );
00316 }
00317
00318
00319 if( dialog.checkBoxUseRenamingRules()->isChecked() ){
00320 _rules = RuleEditing::getRenamingRules();
00321 }
00322
00323 _use_header_timestamp = dialog.checkBoxUseHeaderStamp()->isChecked();
00324
00325
00326 _subscribers.clear();
00327
00328 for (int i=0; i<topic_selected.size(); i++ )
00329 {
00330 boost::function<void(const topic_tools::ShapeShifter::ConstPtr&) > callback;
00331 const std::string& topic_name = std_topic_selected[i];
00332 callback = [this, topic_name](const topic_tools::ShapeShifter::ConstPtr& msg) -> void
00333 {
00334 this->topicCallback(msg, topic_name) ;
00335 };
00336 _subscribers.push_back( _node->subscribe( topic_name, 0, callback) );
00337 }
00338
00339 _running = true;
00340
00341 extractInitialSamples();
00342
00343 _thread = std::thread([this](){ this->updateLoop();} );
00344
00345 return true;
00346 }
00347
00348 void DataStreamROS::enableStreaming(bool enable) { _enabled = enable; }
00349
00350 bool DataStreamROS::isStreamingEnabled() const { return _enabled; }
00351
00352
00353 void DataStreamROS::shutdown()
00354 {
00355 if( _running ){
00356 _running = false;
00357 _thread.join();
00358 }
00359
00360 for(ros::Subscriber& sub: _subscribers)
00361 {
00362 sub.shutdown();
00363 }
00364 _subscribers.clear();
00365 }
00366
00367 DataStreamROS::~DataStreamROS()
00368 {
00369 shutdown();
00370 }
00371
00372 void DataStreamROS::setParentMenu(QMenu *menu)
00373 {
00374 _menu = menu;
00375
00376 _action_saveIntoRosbag = new QAction(QString("Save cached value in a rosbag"), _menu);
00377 QIcon iconSave;
00378 iconSave.addFile(QStringLiteral(":/icons/resources/filesave@2x.png"), QSize(26, 26));
00379 _action_saveIntoRosbag->setIcon(iconSave);
00380
00381 _menu->addAction( _action_saveIntoRosbag );
00382
00383 connect( _action_saveIntoRosbag, &QAction::triggered, this, &DataStreamROS::saveIntoRosbag );
00384 }
00385
00386
00387 void DataStreamROS::updateLoop()
00388 {
00389 while (ros::ok() && _running)
00390 {
00391 ros::spinOnce();
00392 }
00393 }