Program Listing for File Rosbag2Dataset.h

Return to documentation for file (include/mola_input_rosbag2/Rosbag2Dataset.h)

/*               _
 _ __ ___   ___ | | __ _
| '_ ` _ \ / _ \| |/ _` | Modular Optimization framework for
| | | | | | (_) | | (_| | Localization and mApping (MOLA)
|_| |_| |_|\___/|_|\__,_| https://github.com/MOLAorg/mola

 Copyright (C) 2018-2026 Jose Luis Blanco, University of Almeria,
                         and individual contributors.
 SPDX-License-Identifier: GPL-3.0
 See LICENSE for full license information.
*/

#pragma once

#include <mola_kernel/interfaces/Dataset_UI.h>
#include <mola_kernel/interfaces/OfflineDatasetSource.h>
#include <mola_kernel/interfaces/RawDataSourceBase.h>
#include <mrpt/obs/CSensoryFrame.h>
#include <mrpt/serialization/CArchive.h>

#include <list>

// forward decls to isolate build dependencies downstream:
namespace tf2
{
class BufferCore;
}
namespace rosbag2_cpp::readers
{
class SequentialReader;
}
namespace rosbag2_storage
{
class SerializedBagMessage;
}

namespace mola
{
class Rosbag2Dataset : public RawDataSourceBase, public OfflineDatasetSource, public Dataset_UI
{
  DEFINE_MRPT_OBJECT(Rosbag2Dataset, mola)

 public:
  Rosbag2Dataset();

  // See docs in base class
  void spinOnce() override;

  // See docs in base class:
  size_t datasetSize() const override;

  mrpt::obs::CSensoryFrame::Ptr datasetGetObservations(size_t timestep) const override;

  // Virtual interface of Dataset_UI (see docs in derived class)
  size_t datasetUI_size() const override { return datasetSize(); }
  size_t datasetUI_lastQueriedTimestep() const override
  {
    auto lck = mrpt::lockHelper(dataset_ui_mtx_);
    return last_used_tim_index_;
  }
  double datasetUI_playback_speed() const override
  {
    auto lck = mrpt::lockHelper(dataset_ui_mtx_);
    return time_warp_scale_;
  }
  void datasetUI_playback_speed(double speed) override
  {
    auto lck         = mrpt::lockHelper(dataset_ui_mtx_);
    time_warp_scale_ = speed;
  }
  bool datasetUI_paused() const override
  {
    auto lck = mrpt::lockHelper(dataset_ui_mtx_);
    return paused_;
  }
  void datasetUI_paused(bool paused) override
  {
    auto lck = mrpt::lockHelper(dataset_ui_mtx_);
    paused_  = paused;
  }
  void datasetUI_teleport(size_t timestep) override
  {
    auto lck       = mrpt::lockHelper(dataset_ui_mtx_);
    teleport_here_ = timestep;
  }

 protected:
  // See docs in base class
  void initialize_rds(const Yaml& cfg) override;

 private:
  bool        initialized_ = false;
  std::string rosbag_filename_;
  std::string rosbag_storage_id_;  // (sqlite3|mcap) Empty = auto guess

  std::string rosbag_serialization_ = "cdr";
  std::string base_link_frame_id_   = "base_link";

  std::string tf_topic_ = "/tf";

  std::string tf_static_topic_ = "/tf_static";

  std::optional<mrpt::Clock::time_point> rosbag_begin_time_;
  size_t                                 read_ahead_length_ = 15;

  std::optional<mrpt::Clock::time_point> last_play_wallclock_time_;
  double                                 last_dataset_time_ = 0;

  std::shared_ptr<rosbag2_cpp::readers::SequentialReader> reader_;
  size_t                                                  bagMessageCount_ = 0;

  using SF = mrpt::obs::CSensoryFrame;

  SF::Ptr to_mrpt(const rosbag2_storage::SerializedBagMessage& rosmsg);

  void doReadAhead(
      const std::optional<size_t>& requestedIndex = std::nullopt, bool skipBufferAhead = false);

  // timestep in this class is just the index of the message in the rosbag:
  struct DatasetEntry
  {
    SF::Ptr obs;

    std::optional<mrpt::Clock::time_point> timestamp;
  };

  mutable std::vector<std::optional<DatasetEntry>> read_ahead_;
  size_t                                           rosbag_next_idx_       = 0;
  size_t                                           rosbag_next_idx_write_ = 0;
  std::set<std::string>                            already_pub_sensor_labels_;
  mutable std::deque<size_t>                       unload_queue_;

  // Methods and variables for the ROS->MRPT conversion
  // -------------------------------------------------------
  using Obs = std::vector<mrpt::obs::CObservation::Ptr>;

  using CallbackFunction = std::function<Obs(const rosbag2_storage::SerializedBagMessage&)>;

  std::map<std::string, std::vector<CallbackFunction>> lookup_;
  std::set<std::string>                                unhandledTopics_;

  std::shared_ptr<tf2::BufferCore> tfBuffer_;

  template <bool isStatic>
  Obs toTf(const rosbag2_storage::SerializedBagMessage& rosmsg);

#if MRPT_ROS2_BRIDGE_VERSION < 0x030400

  Obs toPointCloud2(
      std::string_view label, const rosbag2_storage::SerializedBagMessage& rosmsg,
      const std::optional<mrpt::poses::CPose3D>& fixedSensorPose);

  Obs toLidar2D(
      std::string_view msg, const rosbag2_storage::SerializedBagMessage& rosmsg,
      const std::optional<mrpt::poses::CPose3D>& fixedSensorPose);

  Obs toRotatingScan(
      std::string_view msg, const rosbag2_storage::SerializedBagMessage& rosmsg,
      const std::optional<mrpt::poses::CPose3D>& fixedSensorPose);

  Obs toIMU(
      std::string_view msg, const rosbag2_storage::SerializedBagMessage& rosmsg,
      const std::optional<mrpt::poses::CPose3D>& fixedSensorPose);

  Obs toGPS(
      std::string_view msg, const rosbag2_storage::SerializedBagMessage& rosmsg,
      const std::optional<mrpt::poses::CPose3D>& fixedSensorPose);

  Obs toOdometry(std::string_view msg, const rosbag2_storage::SerializedBagMessage& rosmsg);

  Obs toImage(
      std::string_view msg, const rosbag2_storage::SerializedBagMessage& rosmsg,
      const std::optional<mrpt::poses::CPose3D>& fixedSensorPose);

  bool findOutSensorPose(
      mrpt::poses::CPose3D& des, const std::string& target_frame, const std::string& source_frame,
      const std::optional<mrpt::poses::CPose3D>& fixedSensorPose, const std::string_view label);
#endif

  Obs catchExceptions(const std::function<Obs()>& f);

  void autoUnloadOldEntries() const;

  mutable timestep_t    last_used_tim_index_ = 0;
  bool                  paused_              = false;
  double                time_warp_scale_     = 1.0;
  std::optional<size_t> teleport_here_;
  mutable std::mutex    dataset_ui_mtx_;
};

}  // namespace mola