datastream_ros2.cpp
Go to the documentation of this file.
1 #include "datastream_ros2.h"
2 
3 #include <QDebug>
4 #include <QTimer>
5 #include <QSettings>
6 #include <QMessageBox>
7 #include <QApplication>
8 #include <QProgressDialog>
9 #include "rclcpp/generic_subscription.hpp"
10 #include "rosbag2_transport/qos.hpp"
11 
13  DataStreamer(),
14  _node(nullptr),
15  _running(false),
16  _first_warning(false)
17 {
19 
20  _context = std::make_shared<rclcpp::Context>();
21  _context->init(0, nullptr);
22 
23  auto exec_args = rclcpp::ExecutorOptions();
24  exec_args.context = _context;
25  _executor = std::make_unique<rclcpp::executors::MultiThreadedExecutor>(exec_args, 2);
26 
27 }
28 
30 {
31  using namespace std::chrono;
32  milliseconds wait_time_ms(1000);
33 
34  QProgressDialog progress_dialog;
35  progress_dialog.setLabelText("Collecting ROS topic samples to understand data layout.");
36  progress_dialog.setRange(0, wait_time_ms.count());
37  progress_dialog.setAutoClose(true);
38  progress_dialog.setAutoReset(true);
39  progress_dialog.show();
40 
41  auto start_time = system_clock::now();
42 
43  while (system_clock::now() - start_time < (wait_time_ms))
44  {
45  int msec = duration_cast<milliseconds>(system_clock::now() - start_time).count();
46  progress_dialog.setValue(msec);
47  QApplication::processEvents();
48  if (progress_dialog.wasCanceled())
49  {
50  break;
51  }
52  }
53 
54  if (progress_dialog.wasCanceled() == false)
55  {
56  progress_dialog.cancel();
57  }
58 }
59 
60 bool DataStreamROS2::start(QStringList* selected_datasources)
61 {
62  if (!_node)
63  {
64  auto node_opts = rclcpp::NodeOptions();
65  node_opts.context(_context);
66  _node = std::make_shared<rclcpp::Node>("plotjuggler", node_opts);
67  _executor->add_node(_node);
68  }
69 
70  {
71  std::lock_guard<std::mutex> lock(mutex());
72  dataMap().numeric.clear();
73  dataMap().user_defined.clear();
74  _parser.clear();
75  }
76 
77  // Display the dialog which allows users to select ros topics to subscribe to,
78  // and continuously update the list of available topics
79  // We start with an empty topic list
80  std::vector<std::pair<QString, QString>> dialog_topics;
81  DialogSelectRosTopics dialog(dialog_topics, _config);
82 
83  QTimer update_list_timer;
84  update_list_timer.setSingleShot(false);
85  update_list_timer.setInterval(1000);
86  update_list_timer.start();
87 
88  auto getTopicsFromNode = [&]() {
89  dialog_topics.clear();
90  auto topic_list = _node->get_topic_names_and_types();
91  for (const auto& topic : topic_list)
92  {
93  // TODO: Handle topics with multiple types
94  auto topic_name = QString::fromStdString(topic.first);
95  auto type_name = QString::fromStdString(topic.second[0]);
96  dialog_topics.push_back({ topic_name, type_name });
97  dialog.updateTopicList(dialog_topics);
98  }
99  };
100 
101  getTopicsFromNode();
102 
103  connect(&update_list_timer, &QTimer::timeout, getTopicsFromNode);
104 
105  int res = dialog.exec();
106  _config = dialog.getResult();
107  update_list_timer.stop();
108 
109  // If no topics were selected, or the OK button was not pressed, do nothing
110  if (res != QDialog::Accepted || _config.topics.empty())
111  {
112  return false;
113  }
114 
117 
118  //--------- subscribe ---------
119  for (const auto& topic : dialog_topics)
120  {
121  if (_config.topics.contains(topic.first))
122  {
123  subscribeToTopic(topic.first.toStdString(), topic.second.toStdString());
124  }
125  }
126  //-----------------------------
127  _clock = rclcpp::Clock();
128  _start_time = _clock.now().nanoseconds();
129  _running = true;
130  _first_warning = true;
131 
132  _spinner = std::thread([this]() {
133  while (_running)
134  {
135  if (_executor)
136  {
137  _executor->spin_once(std::chrono::milliseconds(5));
138  }
139  }
140  });
141 
142  //-----------------------------
143  waitOneSecond();
144  return true;
145 }
146 
148 {
149  return _running;
150 }
151 
153 {
154  _running = false;
155  if (_spinner.joinable())
156  {
157  _spinner.join();
158  }
159 
160  _subscriptions.clear();
161  if (_node)
162  {
163  _executor->remove_node(_node);
164  _node.reset();
165  }
166 }
167 
169 {
170  shutdown();
171 }
172 
173 const std::vector<QAction*> &DataStreamROS2::availableActions()
174 {
175  static std::vector<QAction*> empty;
176  return empty;
177 }
178 
179 
180 void DataStreamROS2::subscribeToTopic(const std::string& topic_name, const std::string& topic_type)
181 {
182  if (_subscriptions.find(topic_name) != _subscriptions.end())
183  {
184  return;
185  }
186 
187  if(!_parser.hasParser(topic_name))
188  {
189  _parser.addParser(topic_name, CreateParserROS2(*parserFactories(), topic_name, topic_type, dataMap()));
190  }
191 
192  auto bound_callback = [=](std::shared_ptr<rclcpp::SerializedMessage> msg) { messageCallback(topic_name, msg); };
193 
194  auto publisher_info = _node->get_publishers_info_by_topic(topic_name);
195  auto detected_qos = rosbag2_transport::Rosbag2QoS::adapt_request_to_offers(topic_name, publisher_info);
196 
197  // double subscription, latching or not
198  auto subscription = _node->create_generic_subscription(topic_name,
199  topic_type,
200  detected_qos,
201  bound_callback);
202  _subscriptions[topic_name] = subscription;
203  _node->get_node_topics_interface()->add_subscription(subscription, nullptr);
204 }
205 
206 void DataStreamROS2::messageCallback(const std::string& topic_name, std::shared_ptr<rclcpp::SerializedMessage> msg)
207 {
208  double timestamp = _node->get_clock()->now().seconds();
209  try
210  {
211  std::unique_lock<std::mutex> lock(mutex());
212 
213  auto msg_ptr = msg.get()->get_rcl_serialized_message();
214  PJ::MessageRef msg_ref( msg_ptr.buffer, msg_ptr.buffer_length );
215 
216  _parser.parseMessage(topic_name, msg_ref, timestamp);
217  }
218  catch (std::runtime_error& ex)
219  {
220  if( _first_warning ) {
221  _first_warning = false;
222  QMessageBox::warning(nullptr, tr("Error"),
223  QString("rosbag::open thrown an exception:\n") +
224  QString(ex.what()) + "\nThis message will be shown only once.");
225  }
226  }
227 
228  emit dataReceived();
229 }
230 
232 {
233  QSettings settings;
234  _config.saveToSettings(settings, "DataStreamROS2");
235 }
236 
238 {
239  QSettings settings;
240  _config.loadFromSettings(settings, "DataStreamROS2");
241 }
242 
243 bool DataStreamROS2::xmlLoadState(const QDomElement& parent_element)
244 {
245  _config.xmlLoadState(parent_element);
246  return true;
247 }
248 
249 bool DataStreamROS2::xmlSaveState(QDomDocument& doc,
250  QDomElement& parent_element) const
251 {
252  _config.xmlSaveState(doc, parent_element);
253  return true;
254 }
DataStreamROS2::_context
std::shared_ptr< rclcpp::Context > _context
Definition: datastream_ros2.h:45
DataStreamROS2::start
bool start(QStringList *selected_datasources) override
Definition: datastream_ros2.cpp:60
DataStreamROS2::isRunning
bool isRunning() const override
Definition: datastream_ros2.cpp:147
PJ::RosParserConfig::saveToSettings
void saveToSettings(QSettings &setting, QString prefix) const
Definition: parser_configuration.cpp:58
DataStreamROS2::messageCallback
void messageCallback(const std::string &topic_name, std::shared_ptr< rclcpp::SerializedMessage > msg)
Definition: datastream_ros2.cpp:206
DataStreamROS2::DataStreamROS2
DataStreamROS2()
Definition: datastream_ros2.cpp:12
PJ::CompositeParser::setConfig
void setConfig(const RosParserConfig &config)
Definition: parser_configuration.cpp:90
DataStreamROS2::_start_time
rcl_time_point_value_t _start_time
Definition: datastream_ros2.h:60
DataStreamROS2::xmlSaveState
bool xmlSaveState(QDomDocument &doc, QDomElement &parent_element) const override
Definition: datastream_ros2.cpp:249
DataStreamROS2::_spinner
std::thread _spinner
Definition: datastream_ros2.h:54
DataStreamROS2::_subscriptions
std::unordered_map< std::string, rclcpp::GenericSubscription::SharedPtr > _subscriptions
Definition: datastream_ros2.h:66
DataStreamROS2::~DataStreamROS2
~DataStreamROS2() override
Definition: datastream_ros2.cpp:168
type_name
std::string type_name(lua_State *L, type t)
msg
msg
DataStreamROS2::loadDefaultSettings
void loadDefaultSettings()
Definition: datastream_ros2.cpp:237
DataStreamROS2::_clock
rclcpp::Clock _clock
Definition: datastream_ros2.h:58
PJ::PlotDataMapRef::numeric
TimeseriesMap numeric
DialogSelectRosTopics
Definition: dialog_select_ros_topics.h:19
DataStreamROS2::xmlLoadState
bool xmlLoadState(const QDomElement &parent_element) override
Definition: datastream_ros2.cpp:243
DialogSelectRosTopics::updateTopicList
void updateTopicList(std::vector< std::pair< QString, QString >> topic_list)
Definition: dialog_select_ros_topics.cpp:81
topic
topic
DataStreamROS2::_first_warning
bool _first_warning
Definition: datastream_ros2.h:52
PJ::RosParserConfig::loadFromSettings
void loadFromSettings(const QSettings &settings, QString prefix)
Definition: parser_configuration.cpp:68
PJ::RosParserConfig::xmlLoadState
void xmlLoadState(const QDomElement &parent_element)
Definition: parser_configuration.cpp:35
PJ::RosParserConfig::xmlSaveState
void xmlSaveState(QDomDocument &doc, QDomElement &plugin_elem) const
Definition: parser_configuration.cpp:8
DataStreamROS2::_node
std::shared_ptr< rclcpp::Node > _node
Definition: datastream_ros2.h:47
PJ::CompositeParser::hasParser
bool hasParser(const std::string &topic_name)
Definition: parser_configuration.h:46
DataStreamROS2::availableActions
const std::vector< QAction * > & availableActions() override
Definition: datastream_ros2.cpp:173
PJ::DataStreamer::parserFactories
const ParserFactories * parserFactories() const
PJ::DataStreamer::dataMap
PlotDataMapRef & dataMap()
PJ::CompositeParser::clear
void clear()
Definition: parser_configuration.h:44
DataStreamROS2::waitOneSecond
void waitOneSecond()
Definition: datastream_ros2.cpp:29
DataStreamROS2::_parser
PJ::CompositeParser _parser
Definition: datastream_ros2.h:49
PJ::CompositeParser::parseMessage
bool parseMessage(const std::string &topic_name, MessageRef serialized_msg, double &timestamp)
Definition: parser_configuration.cpp:102
DataStreamROS2::saveDefaultSettings
void saveDefaultSettings()
Definition: datastream_ros2.cpp:231
CreateParserROS2
std::shared_ptr< PJ::MessageParser > CreateParserROS2(const PJ::ParserFactories &factories, const std::string &topic_name, const std::string &type_name, PJ::PlotDataMapRef &data)
Definition: ros2_parser.cpp:132
DataStreamROS2::shutdown
void shutdown() override
Definition: datastream_ros2.cpp:152
DialogSelectRosTopics::getResult
PJ::RosParserConfig getResult() const
Definition: dialog_select_ros_topics.cpp:154
datastream_ros2.h
PJ::DataStreamer::dataReceived
void dataReceived()
PJ::MessageRef
DataStreamROS2::_running
bool _running
Definition: datastream_ros2.h:51
PJ::DataStreamer::mutex
std::mutex & mutex()
PJ::PlotDataMapRef::user_defined
AnySeriesMap user_defined
DataStreamROS2::_executor
std::unique_ptr< rclcpp::executors::MultiThreadedExecutor > _executor
Definition: datastream_ros2.h:46
PJ::RosParserConfig::topics
QStringList topics
Definition: parser_configuration.h:16
DataStreamROS2::_config
PJ::RosParserConfig _config
Definition: datastream_ros2.h:56
nullptr
#define nullptr
DataStreamROS2::subscribeToTopic
void subscribeToTopic(const std::string &topic_name, const std::string &topic_type)
Definition: datastream_ros2.cpp:180
PJ::CompositeParser::addParser
void addParser(const std::string &topic_name, std::shared_ptr< PJ::MessageParser > parser)
Definition: parser_configuration.cpp:78


plotjuggler_ros
Author(s): Davide Faconti
autogenerated on Wed Feb 21 2024 03:22:55