datastream_ros2.cpp
Go to the documentation of this file.
1 #include "datastream_ros2.h"
2 
3 #include <QDebug>
4 #include <QTimer>
5 #include <QSettings>
6 #include <QMessageBox>
7 #include <QApplication>
8 #include <QProgressDialog>
9 #include "rclcpp/generic_subscription.hpp"
10 
11 // function taken from rosbag2_transport
12 
14  const std::string & topic_name, const std::vector<rclcpp::TopicEndpointInfo> & endpoints)
15 {
16  rclcpp::QoS request_qos(rmw_qos_profile_default.depth);
17 
18  if (endpoints.empty()) {
19  return request_qos;
20  }
21  size_t num_endpoints = endpoints.size();
22  size_t reliability_reliable_endpoints_count = 0;
23  size_t durability_transient_local_endpoints_count = 0;
24  for (const auto & endpoint : endpoints) {
25  const auto & profile = endpoint.qos_profile().get_rmw_qos_profile();
26  if (profile.reliability == RMW_QOS_POLICY_RELIABILITY_RELIABLE) {
27  reliability_reliable_endpoints_count++;
28  }
29  if (profile.durability == RMW_QOS_POLICY_DURABILITY_TRANSIENT_LOCAL) {
30  durability_transient_local_endpoints_count++;
31  }
32  }
33 
34  // Policy: reliability
35  if (reliability_reliable_endpoints_count == num_endpoints) {
36  request_qos.reliable();
37  } else {
38  request_qos.best_effort();
39  }
40 
41  // Policy: durability
42  // If all publishers offer transient_local, we can request it and receive latched messages
43  if (durability_transient_local_endpoints_count == num_endpoints) {
44  request_qos.transient_local();
45  } else {
46  request_qos.durability_volatile();
47  }
48 
49  return request_qos;
50 }
51 
53  DataStreamer(),
54  _node(nullptr),
55  _running(false),
56  _first_warning(false)
57 {
59 
60  _context = std::make_shared<rclcpp::Context>();
61  _context->init(0, nullptr);
62 
63  auto exec_args = rclcpp::ExecutorOptions();
64  exec_args.context = _context;
65  _executor = std::make_unique<rclcpp::executors::MultiThreadedExecutor>(exec_args, 2);
66 
67 }
68 
70 {
71  using namespace std::chrono;
72  milliseconds wait_time_ms(1000);
73 
74  QProgressDialog progress_dialog;
75  progress_dialog.setLabelText("Collecting ROS topic samples to understand data layout.");
76  progress_dialog.setRange(0, wait_time_ms.count());
77  progress_dialog.setAutoClose(true);
78  progress_dialog.setAutoReset(true);
79  progress_dialog.show();
80 
81  auto start_time = system_clock::now();
82 
83  while (system_clock::now() - start_time < (wait_time_ms))
84  {
85  int msec = duration_cast<milliseconds>(system_clock::now() - start_time).count();
86  progress_dialog.setValue(msec);
87  QApplication::processEvents();
88  if (progress_dialog.wasCanceled())
89  {
90  break;
91  }
92  }
93 
94  if (progress_dialog.wasCanceled() == false)
95  {
96  progress_dialog.cancel();
97  }
98 }
99 
100 bool DataStreamROS2::start(QStringList* selected_datasources)
101 {
102  if (!_node)
103  {
104  auto node_opts = rclcpp::NodeOptions();
105  node_opts.context(_context);
106  _node = std::make_shared<rclcpp::Node>("plotjuggler", node_opts);
107  _executor->add_node(_node);
108  }
109 
110  {
111  std::lock_guard<std::mutex> lock(mutex());
112  dataMap().numeric.clear();
113  dataMap().user_defined.clear();
114  _parser.clear();
115  }
116 
117  // Display the dialog which allows users to select ros topics to subscribe to,
118  // and continuously update the list of available topics
119  // We start with an empty topic list
120  std::vector<std::pair<QString, QString>> dialog_topics;
121  DialogSelectRosTopics dialog(dialog_topics, _config);
122 
123  QTimer update_list_timer;
124  update_list_timer.setSingleShot(false);
125  update_list_timer.setInterval(1000);
126  update_list_timer.start();
127 
128  auto getTopicsFromNode = [&]() {
129  dialog_topics.clear();
130  auto topic_list = _node->get_topic_names_and_types();
131  for (const auto& topic : topic_list)
132  {
133  // TODO: Handle topics with multiple types
134  auto topic_name = QString::fromStdString(topic.first);
135  auto type_name = QString::fromStdString(topic.second[0]);
136  dialog_topics.push_back({ topic_name, type_name });
137  dialog.updateTopicList(dialog_topics);
138  }
139  };
140 
141  getTopicsFromNode();
142 
143  connect(&update_list_timer, &QTimer::timeout, getTopicsFromNode);
144 
145  int res = dialog.exec();
146  _config = dialog.getResult();
147  update_list_timer.stop();
148 
149  // If no topics were selected, or the OK button was not pressed, do nothing
150  if (res != QDialog::Accepted || _config.topics.empty())
151  {
152  return false;
153  }
154 
157 
158  //--------- subscribe ---------
159  for (const auto& topic : dialog_topics)
160  {
161  if (_config.topics.contains(topic.first))
162  {
163  subscribeToTopic(topic.first.toStdString(), topic.second.toStdString());
164  }
165  }
166  //-----------------------------
167  _clock = rclcpp::Clock();
168  _start_time = _clock.now().nanoseconds();
169  _running = true;
170  _first_warning = true;
171 
172  _spinner = std::thread([this]() {
173  while (_running)
174  {
175  if (_executor)
176  {
177  _executor->spin_once(std::chrono::milliseconds(5));
178  }
179  }
180  });
181 
182  //-----------------------------
183  waitOneSecond();
184  return true;
185 }
186 
188 {
189  return _running;
190 }
191 
193 {
194  _running = false;
195  if (_spinner.joinable())
196  {
197  _spinner.join();
198  }
199 
200  _subscriptions.clear();
201  if (_node)
202  {
203  _executor->remove_node(_node);
204  _node.reset();
205  }
206 }
207 
209 {
210  shutdown();
211 }
212 
213 const std::vector<QAction*> &DataStreamROS2::availableActions()
214 {
215  static std::vector<QAction*> empty;
216  return empty;
217 }
218 
219 
220 void DataStreamROS2::subscribeToTopic(const std::string& topic_name, const std::string& topic_type)
221 {
222  if (_subscriptions.find(topic_name) != _subscriptions.end())
223  {
224  return;
225  }
226 
227  if(!_parser.hasParser(topic_name))
228  {
229  _parser.addParser(topic_name, CreateParserROS2(*parserFactories(), topic_name, topic_type, dataMap()));
230  }
231 
232  auto bound_callback = [=](std::shared_ptr<rclcpp::SerializedMessage> msg) { messageCallback(topic_name, msg); };
233 
234  auto publisher_info = _node->get_publishers_info_by_topic(topic_name);
235  auto detected_qos = adapt_request_to_offers(topic_name, publisher_info);
236 
237  // double subscription, latching or not
238  auto subscription = _node->create_generic_subscription(topic_name,
239  topic_type,
240  detected_qos,
241  bound_callback);
242  _subscriptions[topic_name] = subscription;
243  _node->get_node_topics_interface()->add_subscription(subscription, nullptr);
244 }
245 
246 void DataStreamROS2::messageCallback(const std::string& topic_name, std::shared_ptr<rclcpp::SerializedMessage> msg)
247 {
248  double timestamp = _node->get_clock()->now().seconds();
249  try
250  {
251  std::unique_lock<std::mutex> lock(mutex());
252 
253  auto msg_ptr = msg.get()->get_rcl_serialized_message();
254  PJ::MessageRef msg_ref( msg_ptr.buffer, msg_ptr.buffer_length );
255 
256  _parser.parseMessage(topic_name, msg_ref, timestamp);
257  }
258  catch (std::runtime_error& ex)
259  {
260  if( _first_warning ) {
261  _first_warning = false;
262  QMessageBox::warning(nullptr, tr("Error"),
263  QString("rosbag::open thrown an exception:\n") +
264  QString(ex.what()) + "\nThis message will be shown only once.");
265  }
266  }
267 
268  emit dataReceived();
269 }
270 
272 {
273  QSettings settings;
274  _config.saveToSettings(settings, "DataStreamROS2");
275 }
276 
278 {
279  QSettings settings;
280  _config.loadFromSettings(settings, "DataStreamROS2");
281 }
282 
283 bool DataStreamROS2::xmlLoadState(const QDomElement& parent_element)
284 {
285  _config.xmlLoadState(parent_element);
286  return true;
287 }
288 
289 bool DataStreamROS2::xmlSaveState(QDomDocument& doc,
290  QDomElement& parent_element) const
291 {
292  _config.xmlSaveState(doc, parent_element);
293  return true;
294 }
DataStreamROS2::_context
std::shared_ptr< rclcpp::Context > _context
Definition: datastream_ros2.h:45
DataStreamROS2::start
bool start(QStringList *selected_datasources) override
Definition: datastream_ros2.cpp:100
DataStreamROS2::isRunning
bool isRunning() const override
Definition: datastream_ros2.cpp:187
PJ::RosParserConfig::saveToSettings
void saveToSettings(QSettings &setting, QString prefix) const
Definition: parser_configuration.cpp:58
DataStreamROS2::messageCallback
void messageCallback(const std::string &topic_name, std::shared_ptr< rclcpp::SerializedMessage > msg)
Definition: datastream_ros2.cpp:246
DataStreamROS2::DataStreamROS2
DataStreamROS2()
Definition: datastream_ros2.cpp:52
PJ::CompositeParser::setConfig
void setConfig(const RosParserConfig &config)
Definition: parser_configuration.cpp:90
DataStreamROS2::_start_time
rcl_time_point_value_t _start_time
Definition: datastream_ros2.h:60
DataStreamROS2::xmlSaveState
bool xmlSaveState(QDomDocument &doc, QDomElement &parent_element) const override
Definition: datastream_ros2.cpp:289
DataStreamROS2::_spinner
std::thread _spinner
Definition: datastream_ros2.h:54
DataStreamROS2::_subscriptions
std::unordered_map< std::string, rclcpp::GenericSubscription::SharedPtr > _subscriptions
Definition: datastream_ros2.h:66
DataStreamROS2::~DataStreamROS2
~DataStreamROS2() override
Definition: datastream_ros2.cpp:208
msg
msg
DataStreamROS2::loadDefaultSettings
void loadDefaultSettings()
Definition: datastream_ros2.cpp:277
DataStreamROS2::_clock
rclcpp::Clock _clock
Definition: datastream_ros2.h:58
PJ::PlotDataMapRef::numeric
TimeseriesMap numeric
DialogSelectRosTopics
Definition: dialog_select_ros_topics.h:19
DataStreamROS2::xmlLoadState
bool xmlLoadState(const QDomElement &parent_element) override
Definition: datastream_ros2.cpp:283
DialogSelectRosTopics::updateTopicList
void updateTopicList(std::vector< std::pair< QString, QString >> topic_list)
Definition: dialog_select_ros_topics.cpp:81
timestamp
timestamp
topic
topic
DataStreamROS2::_first_warning
bool _first_warning
Definition: datastream_ros2.h:52
PJ::RosParserConfig::loadFromSettings
void loadFromSettings(const QSettings &settings, QString prefix)
Definition: parser_configuration.cpp:68
PJ::RosParserConfig::xmlLoadState
void xmlLoadState(const QDomElement &parent_element)
Definition: parser_configuration.cpp:35
PJ::RosParserConfig::xmlSaveState
void xmlSaveState(QDomDocument &doc, QDomElement &plugin_elem) const
Definition: parser_configuration.cpp:8
DataStreamROS2::_node
std::shared_ptr< rclcpp::Node > _node
Definition: datastream_ros2.h:47
PJ::CompositeParser::hasParser
bool hasParser(const std::string &topic_name)
Definition: parser_configuration.h:46
DataStreamROS2::availableActions
const std::vector< QAction * > & availableActions() override
Definition: datastream_ros2.cpp:213
PJ::DataStreamer::parserFactories
const ParserFactories * parserFactories() const
PJ::DataStreamer::dataMap
PlotDataMapRef & dataMap()
PJ::CompositeParser::clear
void clear()
Definition: parser_configuration.h:44
DataStreamROS2::waitOneSecond
void waitOneSecond()
Definition: datastream_ros2.cpp:69
DataStreamROS2::_parser
PJ::CompositeParser _parser
Definition: datastream_ros2.h:49
PJ::CompositeParser::parseMessage
bool parseMessage(const std::string &topic_name, MessageRef serialized_msg, double &timestamp)
Definition: parser_configuration.cpp:102
DataStreamROS2::saveDefaultSettings
void saveDefaultSettings()
Definition: datastream_ros2.cpp:271
CreateParserROS2
std::shared_ptr< PJ::MessageParser > CreateParserROS2(const PJ::ParserFactories &factories, const std::string &topic_name, const std::string &type_name, PJ::PlotDataMapRef &data)
Definition: ros2_parser.cpp:133
DataStreamROS2::shutdown
void shutdown() override
Definition: datastream_ros2.cpp:192
adapt_request_to_offers
rclcpp::QoS adapt_request_to_offers(const std::string &topic_name, const std::vector< rclcpp::TopicEndpointInfo > &endpoints)
Definition: datastream_ros2.cpp:13
DialogSelectRosTopics::getResult
PJ::RosParserConfig getResult() const
Definition: dialog_select_ros_topics.cpp:154
datastream_ros2.h
PJ::DataStreamer::dataReceived
void dataReceived()
PJ::MessageRef
DataStreamROS2::_running
bool _running
Definition: datastream_ros2.h:51
PJ::DataStreamer::mutex
std::mutex & mutex()
PJ::PlotDataMapRef::user_defined
AnySeriesMap user_defined
DataStreamROS2::_executor
std::unique_ptr< rclcpp::executors::MultiThreadedExecutor > _executor
Definition: datastream_ros2.h:46
PJ::RosParserConfig::topics
QStringList topics
Definition: parser_configuration.h:16
DataStreamROS2::_config
PJ::RosParserConfig _config
Definition: datastream_ros2.h:56
nullptr
#define nullptr
DataStreamROS2::subscribeToTopic
void subscribeToTopic(const std::string &topic_name, const std::string &topic_type)
Definition: datastream_ros2.cpp:220
PJ::CompositeParser::addParser
void addParser(const std::string &topic_name, std::shared_ptr< PJ::MessageParser > parser)
Definition: parser_configuration.cpp:78


plotjuggler_ros
Author(s): Davide Faconti
autogenerated on Sat May 24 2025 02:24:01