Program Listing for File sequential_compression_writer.hpp
↰ Return to documentation for file (include/rosbag2_compression/sequential_compression_writer.hpp
)
// Copyright 2020 Amazon.com, Inc. or its affiliates. All Rights Reserved.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef ROSBAG2_COMPRESSION__SEQUENTIAL_COMPRESSION_WRITER_HPP_
#define ROSBAG2_COMPRESSION__SEQUENTIAL_COMPRESSION_WRITER_HPP_
#include <atomic>
#include <condition_variable>
#include <memory>
#include <mutex>
#include <queue>
#include <string>
#include <thread>
#include <unordered_map>
#include <vector>
#include "rcpputils/thread_safety_annotations.hpp"
#include "rosbag2_cpp/converter.hpp"
#include "rosbag2_cpp/converter_options.hpp"
#include "rosbag2_cpp/serialization_format_converter_factory.hpp"
#include "rosbag2_cpp/serialization_format_converter_factory_interface.hpp"
#include "rosbag2_cpp/writers/sequential_writer.hpp"
#include "rosbag2_storage/metadata_io.hpp"
#include "rosbag2_storage/storage_factory.hpp"
#include "rosbag2_storage/storage_factory_interface.hpp"
#include "rosbag2_storage/storage_options.hpp"
#include "rosbag2_storage/storage_interfaces/base_io_interface.hpp"
#include "rosbag2_compression/compression_options.hpp"
#include "base_compressor_interface.hpp"
#include "compression_factory.hpp"
#include "compression_options.hpp"
#include "visibility_control.hpp"
#ifdef _WIN32
# pragma warning(push)
# pragma warning(disable:4251)
#endif
namespace rosbag2_compression
{
class ROSBAG2_COMPRESSION_PUBLIC SequentialCompressionWriter
: public rosbag2_cpp::writers::SequentialWriter
{
public:
explicit SequentialCompressionWriter(
const rosbag2_compression::CompressionOptions & compression_options =
rosbag2_compression::CompressionOptions());
SequentialCompressionWriter(
const rosbag2_compression::CompressionOptions & compression_options,
std::unique_ptr<rosbag2_compression::CompressionFactory> compression_factory,
std::unique_ptr<rosbag2_storage::StorageFactoryInterface> storage_factory,
std::shared_ptr<rosbag2_cpp::SerializationFormatConverterFactoryInterface> converter_factory,
std::unique_ptr<rosbag2_storage::MetadataIo> metadata_io);
~SequentialCompressionWriter() override;
void create_topic(const rosbag2_storage::TopicMetadata & topic_with_type) override;
void remove_topic(const rosbag2_storage::TopicMetadata & topic_with_type) override;
void write(std::shared_ptr<rosbag2_storage::SerializedBagMessage> message) override;
void open(
const rosbag2_storage::StorageOptions & storage_options,
const rosbag2_cpp::ConverterOptions & converter_options) override;
void close() override;
protected:
virtual void compress_file(
BaseCompressorInterface & compressor,
const std::string & file_relative_to_bag);
virtual void compress_message(
BaseCompressorInterface & compressor,
std::shared_ptr<rosbag2_storage::SerializedBagMessage> message);
virtual void setup_compression();
virtual void setup_compressor_threads();
virtual void stop_compressor_threads();
private:
std::shared_ptr<rosbag2_compression::BaseCompressorInterface> compressor_{};
std::unique_ptr<rosbag2_compression::CompressionFactory> compression_factory_{};
std::mutex compressor_queue_mutex_;
std::queue<std::shared_ptr<rosbag2_storage::SerializedBagMessage>>
compressor_message_queue_ RCPPUTILS_TSA_GUARDED_BY(compressor_queue_mutex_);
std::queue<std::string> compressor_file_queue_ RCPPUTILS_TSA_GUARDED_BY(compressor_queue_mutex_);
std::vector<std::thread> compression_threads_;
/* *INDENT-OFF* */ // uncrustify doesn't understand the macro + brace initializer
std::atomic_bool compression_is_running_
RCPPUTILS_TSA_GUARDED_BY(compressor_queue_mutex_) {false};
/* *INDENT-ON* */
std::recursive_mutex storage_mutex_;
std::condition_variable compressor_condition_;
rosbag2_compression::CompressionOptions compression_options_{};
bool should_compress_last_file_{true};
// Runs a while loop that pulls data from the compression queue until
// compression_is_running_ is false; should be run in a separate thread
void compression_thread_fn();
// Closes the current backed storage and opens the next bagfile.
void split_bagfile() override;
// Checks if the current recording bagfile needs to be split and rolled over to a new file.
bool should_split_bagfile(
const std::chrono::time_point<std::chrono::high_resolution_clock> & current_time);
// Prepares the metadata by setting initial values.
void init_metadata() override;
};
} // namespace rosbag2_compression
#endif // ROSBAG2_COMPRESSION__SEQUENTIAL_COMPRESSION_WRITER_HPP_