Program Listing for File sequential_compression_writer.hpp

Return to documentation for file (include/rosbag2_compression/sequential_compression_writer.hpp)

// Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef ROSBAG2_COMPRESSION__SEQUENTIAL_COMPRESSION_WRITER_HPP_
#define ROSBAG2_COMPRESSION__SEQUENTIAL_COMPRESSION_WRITER_HPP_

#include <atomic>
#include <condition_variable>
#include <memory>
#include <mutex>
#include <queue>
#include <string>
#include <thread>
#include <unordered_map>
#include <vector>

#include "rcpputils/thread_safety_annotations.hpp"

#include "rosbag2_cpp/converter.hpp"
#include "rosbag2_cpp/converter_options.hpp"
#include "rosbag2_cpp/serialization_format_converter_factory.hpp"
#include "rosbag2_cpp/serialization_format_converter_factory_interface.hpp"
#include "rosbag2_cpp/writers/sequential_writer.hpp"

#include "rosbag2_storage/metadata_io.hpp"
#include "rosbag2_storage/storage_factory.hpp"
#include "rosbag2_storage/storage_factory_interface.hpp"
#include "rosbag2_storage/storage_options.hpp"
#include "rosbag2_storage/storage_interfaces/base_io_interface.hpp"

#include "rosbag2_compression/compression_options.hpp"

#include "base_compressor_interface.hpp"
#include "compression_factory.hpp"
#include "compression_options.hpp"
#include "visibility_control.hpp"

#ifdef _WIN32
# pragma warning(push)
# pragma warning(disable:4251)
#endif

namespace rosbag2_compression
{

class ROSBAG2_COMPRESSION_PUBLIC SequentialCompressionWriter
  : public rosbag2_cpp::writers::SequentialWriter
{
public:
  explicit SequentialCompressionWriter(
    const rosbag2_compression::CompressionOptions & compression_options =
    rosbag2_compression::CompressionOptions());

  SequentialCompressionWriter(
    const rosbag2_compression::CompressionOptions & compression_options,
    std::unique_ptr<rosbag2_compression::CompressionFactory> compression_factory,
    std::unique_ptr<rosbag2_storage::StorageFactoryInterface> storage_factory,
    std::shared_ptr<rosbag2_cpp::SerializationFormatConverterFactoryInterface> converter_factory,
    std::unique_ptr<rosbag2_storage::MetadataIo> metadata_io);

  ~SequentialCompressionWriter() override;

  void create_topic(const rosbag2_storage::TopicMetadata & topic_with_type) override;

  void remove_topic(const rosbag2_storage::TopicMetadata & topic_with_type) override;

  void write(std::shared_ptr<rosbag2_storage::SerializedBagMessage> message) override;

  void open(
    const rosbag2_storage::StorageOptions & storage_options,
    const rosbag2_cpp::ConverterOptions & converter_options) override;

  void close() override;

protected:
  virtual void compress_file(
    BaseCompressorInterface & compressor,
    const std::string & file_relative_to_bag);

  virtual void compress_message(
    BaseCompressorInterface & compressor,
    std::shared_ptr<rosbag2_storage::SerializedBagMessage> message);

  virtual void setup_compression();

  virtual void setup_compressor_threads();

  virtual void stop_compressor_threads();

private:
  std::shared_ptr<rosbag2_compression::BaseCompressorInterface> compressor_{};
  std::unique_ptr<rosbag2_compression::CompressionFactory> compression_factory_{};
  std::mutex compressor_queue_mutex_;
  std::queue<std::shared_ptr<rosbag2_storage::SerializedBagMessage>>
  compressor_message_queue_ RCPPUTILS_TSA_GUARDED_BY(compressor_queue_mutex_);
  std::queue<std::string> compressor_file_queue_ RCPPUTILS_TSA_GUARDED_BY(compressor_queue_mutex_);
  std::vector<std::thread> compression_threads_;
  /* *INDENT-OFF* */  // uncrustify doesn't understand the macro + brace initializer
  std::atomic_bool compression_is_running_
    RCPPUTILS_TSA_GUARDED_BY(compressor_queue_mutex_) {false};
  /* *INDENT-ON* */
  std::recursive_mutex storage_mutex_;
  std::condition_variable compressor_condition_;

  rosbag2_compression::CompressionOptions compression_options_{};

  bool should_compress_last_file_{true};

  // Runs a while loop that pulls data from the compression queue until
  // compression_is_running_ is false; should be run in a separate thread
  void compression_thread_fn();

  // Closes the current backed storage and opens the next bagfile.
  void split_bagfile() override;

  // Checks if the current recording bagfile needs to be split and rolled over to a new file.
  bool should_split_bagfile(
    const std::chrono::time_point<std::chrono::high_resolution_clock> & current_time);

  // Prepares the metadata by setting initial values.
  void init_metadata() override;
};
}  // namespace rosbag2_compression
#endif  // ROSBAG2_COMPRESSION__SEQUENTIAL_COMPRESSION_WRITER_HPP_