Program Listing for File RTPSMessageGroup.h
↰ Return to documentation for file (/tmp/ws/src/fastrtps/include/fastdds/rtps/messages/RTPSMessageGroup.h
)
// Copyright 2016 Proyectos y Sistemas de Mantenimiento SL (eProsima).
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef _FASTDDS_RTPS_RTPSMESSAGEGROUP_H_
#define _FASTDDS_RTPS_RTPSMESSAGEGROUP_H_
#ifndef DOXYGEN_SHOULD_SKIP_THIS_PUBLIC
#include <fastdds/rtps/messages/RTPSMessageSenderInterface.hpp>
#include <fastdds/rtps/messages/RTPSMessageCreator.h>
#include <fastdds/rtps/common/FragmentNumber.h>
#include <vector>
#include <chrono>
#include <cassert>
#include <memory>
namespace eprosima {
namespace fastrtps {
namespace rtps {
class RTPSParticipantImpl;
class Endpoint;
class RTPSMessageGroup_t;
class RTPSMessageGroup
{
public:
class timeout : public std::runtime_error
{
public:
timeout()
: std::runtime_error("timeout")
{
}
virtual ~timeout() = default;
};
class limit_exceeded : public std::runtime_error
{
public:
limit_exceeded()
: std::runtime_error("limit_exceeded")
{
}
virtual ~limit_exceeded() = default;
};
RTPSMessageGroup(
RTPSParticipantImpl* participant,
bool internal_buffer = false);
RTPSMessageGroup(
RTPSParticipantImpl* participant,
Endpoint* endpoint,
RTPSMessageSenderInterface* msg_sender,
std::chrono::steady_clock::time_point max_blocking_time_point =
std::chrono::steady_clock::now() + std::chrono::hours(24));
~RTPSMessageGroup() noexcept(false);
bool add_data(
const CacheChange_t& change,
bool expects_inline_qos);
bool add_data_frag(
const CacheChange_t& change,
const uint32_t fragment_number,
bool expects_inline_qos);
bool add_heartbeat(
const SequenceNumber_t& first_seq,
const SequenceNumber_t& last_seq,
Count_t count,
bool is_final,
bool liveliness_flag);
bool add_gap(
std::set<SequenceNumber_t>& changes_seq_numbers);
bool add_gap(
const SequenceNumber_t& gap_initial_sequence,
const SequenceNumberSet_t& gap_bitmap);
bool add_gap(
const SequenceNumber_t& gap_initial_sequence,
const SequenceNumberSet_t& gap_bitmap,
const GUID_t& reader_guid);
bool add_acknack(
const SequenceNumberSet_t& seq_num_set,
int32_t count,
bool final_flag);
bool add_nackfrag(
const SequenceNumber_t& seq_number,
FragmentNumberSet_t fn_state,
int32_t count);
void flush_and_reset();
void sender(
Endpoint* endpoint,
RTPSMessageSenderInterface* msg_sender)
{
assert((endpoint != nullptr && msg_sender != nullptr) || (endpoint == nullptr && msg_sender == nullptr));
if (endpoint != endpoint_ || msg_sender != sender_)
{
flush_and_reset();
}
endpoint_ = endpoint;
sender_ = msg_sender;
}
static inline constexpr uint32_t get_max_fragment_payload_size()
{
// Max fragment is 64KBytes_max - header - inlineqos - 3(for better alignment)
return std::numeric_limits<uint16_t>::max() - data_frag_header_size_ - max_inline_qos_size_ - 3;
}
void set_sent_bytes_limitation(
uint32_t limit)
{
sent_bytes_limitation_ = limit;
}
void reset_current_bytes_processed()
{
current_sent_bytes_ = 0;
}
inline uint32_t get_current_bytes_processed() const
{
return current_sent_bytes_ + full_msg_->length;
}
private:
static constexpr uint32_t data_frag_header_size_ = 28;
static constexpr uint32_t max_inline_qos_size_ = 32;
void reset_to_header();
void flush();
void send();
void check_and_maybe_flush()
{
check_and_maybe_flush(sender_->destination_guid_prefix());
}
void check_and_maybe_flush(
const GuidPrefix_t& destination_guid_prefix);
bool insert_submessage(
bool is_big_submessage)
{
return insert_submessage(sender_->destination_guid_prefix(), is_big_submessage);
}
bool insert_submessage(
const GuidPrefix_t& destination_guid_prefix,
bool is_big_submessage);
bool add_info_dst_in_buffer(
CDRMessage_t* buffer,
const GuidPrefix_t& destination_guid_prefix);
bool add_info_ts_in_buffer(
const Time_t& timestamp);
bool create_gap_submessage(
const SequenceNumber_t& gap_initial_sequence,
const SequenceNumberSet_t& gap_bitmap,
const EntityId_t& reader_id);
RTPSMessageSenderInterface* sender_ = nullptr;
Endpoint* endpoint_ = nullptr;
CDRMessage_t* full_msg_ = nullptr;
CDRMessage_t* submessage_msg_ = nullptr;
GuidPrefix_t current_dst_;
RTPSParticipantImpl* participant_ = nullptr;
#if HAVE_SECURITY
CDRMessage_t* encrypt_msg_ = nullptr;
#endif // if HAVE_SECURITY
std::chrono::steady_clock::time_point max_blocking_time_point_;
bool max_blocking_time_is_set_ = false;
std::unique_ptr<RTPSMessageGroup_t> send_buffer_;
bool internal_buffer_ = false;
uint32_t sent_bytes_limitation_ = 0;
uint32_t current_sent_bytes_ = 0;
};
} /* namespace rtps */
} /* namespace fastrtps */
} /* namespace eprosima */
#endif // ifndef DOXYGEN_SHOULD_SKIP_THIS_PUBLIC
#endif /* _FASTDDS_RTPS_RTPSMESSAGEGROUP_H_ */