Program Listing for File RTPSMessageGroup.h

Return to documentation for file (/tmp/ws/src/fastrtps/include/fastdds/rtps/messages/RTPSMessageGroup.h)

// Copyright 2016 Proyectos y Sistemas de Mantenimiento SL (eProsima).
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef _FASTDDS_RTPS_RTPSMESSAGEGROUP_H_
#define _FASTDDS_RTPS_RTPSMESSAGEGROUP_H_
#ifndef DOXYGEN_SHOULD_SKIP_THIS_PUBLIC

#include <fastdds/rtps/messages/RTPSMessageSenderInterface.hpp>
#include <fastdds/rtps/messages/RTPSMessageCreator.h>
#include <fastdds/rtps/common/FragmentNumber.h>

#include <vector>
#include <chrono>
#include <cassert>
#include <memory>


namespace eprosima {
namespace fastrtps {
namespace rtps {

class RTPSParticipantImpl;
class Endpoint;
class RTPSMessageGroup_t;

class RTPSMessageGroup
{
public:

    class timeout : public std::runtime_error
    {
    public:

        timeout()
            : std::runtime_error("timeout")
        {
        }

        virtual ~timeout() = default;
    };

    class limit_exceeded : public std::runtime_error
    {
    public:

        limit_exceeded()
            : std::runtime_error("limit_exceeded")
        {
        }

        virtual ~limit_exceeded() = default;
    };

    RTPSMessageGroup(
            RTPSParticipantImpl* participant,
            bool internal_buffer = false);

    RTPSMessageGroup(
            RTPSParticipantImpl* participant,
            Endpoint* endpoint,
            RTPSMessageSenderInterface* msg_sender,
            std::chrono::steady_clock::time_point max_blocking_time_point =
            std::chrono::steady_clock::now() + std::chrono::hours(24));

    ~RTPSMessageGroup() noexcept(false);

    bool add_data(
            const CacheChange_t& change,
            bool expects_inline_qos);

    bool add_data_frag(
            const CacheChange_t& change,
            const uint32_t fragment_number,
            bool expects_inline_qos);

    bool add_heartbeat(
            const SequenceNumber_t& first_seq,
            const SequenceNumber_t& last_seq,
            Count_t count,
            bool is_final,
            bool liveliness_flag);

    bool add_gap(
            std::set<SequenceNumber_t>& changes_seq_numbers);

    bool add_gap(
            const SequenceNumber_t& gap_initial_sequence,
            const SequenceNumberSet_t& gap_bitmap);

    bool add_gap(
            const SequenceNumber_t& gap_initial_sequence,
            const SequenceNumberSet_t& gap_bitmap,
            const GUID_t& reader_guid);

    bool add_acknack(
            const SequenceNumberSet_t& seq_num_set,
            int32_t count,
            bool final_flag);

    bool add_nackfrag(
            const SequenceNumber_t& seq_number,
            FragmentNumberSet_t fn_state,
            int32_t count);

    void flush_and_reset();

    void sender(
            Endpoint* endpoint,
            RTPSMessageSenderInterface* msg_sender)
    {
        assert((endpoint != nullptr && msg_sender != nullptr) || (endpoint == nullptr && msg_sender == nullptr));
        if (endpoint != endpoint_ || msg_sender != sender_)
        {
            flush_and_reset();
        }

        endpoint_ = endpoint;
        sender_ = msg_sender;
    }

    static inline constexpr uint32_t get_max_fragment_payload_size()
    {
        // Max fragment is 64KBytes_max - header - inlineqos - 3(for better alignment)
        return std::numeric_limits<uint16_t>::max() - data_frag_header_size_ - max_inline_qos_size_ - 3;
    }

    void set_sent_bytes_limitation(
            uint32_t limit)
    {
        sent_bytes_limitation_ = limit;
    }

    void reset_current_bytes_processed()
    {
        current_sent_bytes_ = 0;
    }

    inline uint32_t get_current_bytes_processed() const
    {
        return current_sent_bytes_ + full_msg_->length;
    }

private:

    static constexpr uint32_t data_frag_header_size_ = 28;
    static constexpr uint32_t max_inline_qos_size_ = 32;

    void reset_to_header();

    void flush();

    void send();

    void check_and_maybe_flush()
    {
        check_and_maybe_flush(sender_->destination_guid_prefix());
    }

    void check_and_maybe_flush(
            const GuidPrefix_t& destination_guid_prefix);

    bool insert_submessage(
            bool is_big_submessage)
    {
        return insert_submessage(sender_->destination_guid_prefix(), is_big_submessage);
    }

    bool insert_submessage(
            const GuidPrefix_t& destination_guid_prefix,
            bool is_big_submessage);

    bool add_info_dst_in_buffer(
            CDRMessage_t* buffer,
            const GuidPrefix_t& destination_guid_prefix);

    bool add_info_ts_in_buffer(
            const Time_t& timestamp);

    bool create_gap_submessage(
            const SequenceNumber_t& gap_initial_sequence,
            const SequenceNumberSet_t& gap_bitmap,
            const EntityId_t& reader_id);

    RTPSMessageSenderInterface* sender_ = nullptr;

    Endpoint* endpoint_ = nullptr;

    CDRMessage_t* full_msg_ = nullptr;

    CDRMessage_t* submessage_msg_ = nullptr;

    GuidPrefix_t current_dst_;

    RTPSParticipantImpl* participant_ = nullptr;

     #if HAVE_SECURITY

    CDRMessage_t* encrypt_msg_ = nullptr;

     #endif // if HAVE_SECURITY

    std::chrono::steady_clock::time_point max_blocking_time_point_;

    bool max_blocking_time_is_set_ = false;

    std::unique_ptr<RTPSMessageGroup_t> send_buffer_;

    bool internal_buffer_ = false;

    uint32_t sent_bytes_limitation_ = 0;

    uint32_t current_sent_bytes_ = 0;
};

}        /* namespace rtps */
} /* namespace fastrtps */
} /* namespace eprosima */

#endif // ifndef DOXYGEN_SHOULD_SKIP_THIS_PUBLIC
#endif /* _FASTDDS_RTPS_RTPSMESSAGEGROUP_H_ */