Program Listing for File CacheChange.h
↰ Return to documentation for file (/tmp/ws/src/fastrtps/include/fastdds/rtps/common/CacheChange.h
)
// Copyright 2016 Proyectos y Sistemas de Mantenimiento SL (eProsima).
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef _FASTDDS_RTPS_CACHECHANGE_H_
#define _FASTDDS_RTPS_CACHECHANGE_H_
#include <cassert>
#include <atomic>
#include <fastdds/rtps/common/ChangeKind_t.hpp>
#include <fastdds/rtps/common/FragmentNumber.h>
#include <fastdds/rtps/common/InstanceHandle.h>
#include <fastdds/rtps/common/SerializedPayload.h>
#include <fastdds/rtps/common/Time_t.h>
#include <fastdds/rtps/common/Types.h>
#include <fastdds/rtps/common/WriteParams.h>
#include <fastdds/rtps/history/IPayloadPool.h>
namespace eprosima {
namespace fastrtps {
namespace rtps {
struct CacheChangeWriterInfo_t
{
size_t num_sent_submessages = 0;
CacheChange_t* volatile previous = nullptr;
CacheChange_t* volatile next = nullptr;
std::atomic_bool is_linked {false};
};
struct CacheChangeReaderInfo_t
{
Time_t receptionTimestamp;
int32_t disposed_generation_count;
int32_t no_writers_generation_count;
uint32_t writer_ownership_strength;
};
struct RTPS_DllAPI CacheChange_t
{
ChangeKind_t kind = ALIVE;
GUID_t writerGUID{};
InstanceHandle_t instanceHandle{};
SequenceNumber_t sequenceNumber{};
SerializedPayload_t serializedPayload{};
SerializedPayload_t inline_qos{};
bool isRead = false;
Time_t sourceTimestamp{};
union
{
CacheChangeReaderInfo_t reader_info;
CacheChangeWriterInfo_t writer_info;
};
WriteParams write_params{};
bool is_untyped_ = true;
CacheChange_t()
: writer_info()
{
inline_qos.encapsulation = DEFAULT_ENDIAN == LITTLEEND ? PL_CDR_LE : PL_CDR_BE;
}
CacheChange_t(
const CacheChange_t&) = delete;
const CacheChange_t& operator =(
const CacheChange_t&) = delete;
CacheChange_t(
uint32_t payload_size,
bool is_untyped = false)
: serializedPayload(payload_size)
, is_untyped_(is_untyped)
{
}
bool copy(
const CacheChange_t* ch_ptr)
{
kind = ch_ptr->kind;
writerGUID = ch_ptr->writerGUID;
instanceHandle = ch_ptr->instanceHandle;
sequenceNumber = ch_ptr->sequenceNumber;
sourceTimestamp = ch_ptr->sourceTimestamp;
reader_info.receptionTimestamp = ch_ptr->reader_info.receptionTimestamp;
write_params = ch_ptr->write_params;
isRead = ch_ptr->isRead;
fragment_size_ = ch_ptr->fragment_size_;
fragment_count_ = ch_ptr->fragment_count_;
first_missing_fragment_ = ch_ptr->first_missing_fragment_;
return serializedPayload.copy(&ch_ptr->serializedPayload, !ch_ptr->is_untyped_);
}
void copy_not_memcpy(
const CacheChange_t* ch_ptr)
{
kind = ch_ptr->kind;
writerGUID = ch_ptr->writerGUID;
instanceHandle = ch_ptr->instanceHandle;
sequenceNumber = ch_ptr->sequenceNumber;
sourceTimestamp = ch_ptr->sourceTimestamp;
reader_info.receptionTimestamp = ch_ptr->reader_info.receptionTimestamp;
write_params = ch_ptr->write_params;
isRead = ch_ptr->isRead;
// Copy certain values from serializedPayload
serializedPayload.encapsulation = ch_ptr->serializedPayload.encapsulation;
// Copy fragment size and calculate fragment count
setFragmentSize(ch_ptr->fragment_size_, false);
}
virtual ~CacheChange_t()
{
if (payload_owner_ != nullptr)
{
payload_owner_->release_payload(*this);
}
assert(payload_owner_ == nullptr);
}
uint32_t getFragmentCount() const
{
return fragment_count_;
}
uint16_t getFragmentSize() const
{
return fragment_size_;
}
bool is_fully_assembled()
{
return first_missing_fragment_ >= fragment_count_;
}
bool contains_first_fragment()
{
return 0 < first_missing_fragment_;
}
void get_missing_fragments(
FragmentNumberSet_t& frag_sns)
{
// Note: Fragment numbers are 1-based but we keep them 0 based.
frag_sns.base(first_missing_fragment_ + 1);
// Traverse list of missing fragments, adding them to frag_sns
uint32_t current_frag = first_missing_fragment_;
while (current_frag < fragment_count_)
{
frag_sns.add(current_frag + 1);
current_frag = get_next_missing_fragment(current_frag);
}
}
void setFragmentSize(
uint16_t fragment_size,
bool create_fragment_list = false)
{
fragment_size_ = fragment_size;
fragment_count_ = 0;
first_missing_fragment_ = 0;
if (fragment_size > 0)
{
// This follows RTPS 8.3.7.3.5
fragment_count_ = (serializedPayload.length + fragment_size - 1) / fragment_size;
if (create_fragment_list)
{
// Keep index of next fragment on the payload portion at the beginning of each fragment. Last
// fragment will have fragment_count_ as 'next fragment index'
size_t offset = 0;
for (uint32_t i = 1; i <= fragment_count_; i++, offset += fragment_size_)
{
set_next_missing_fragment(i - 1, i); // index to next fragment in missing list
}
}
else
{
// List not created. This means we are going to send this change fragmented, so it is already
// assembled, and the missing list is empty (i.e. first missing points to fragment count)
first_missing_fragment_ = fragment_count_;
}
}
}
bool add_fragments(
const SerializedPayload_t& incoming_data,
uint32_t fragment_starting_num,
uint32_t fragments_in_submessage)
{
uint32_t original_offset = (fragment_starting_num - 1) * fragment_size_;
uint32_t incoming_length = fragment_size_ * fragments_in_submessage;
uint32_t last_fragment_index = fragment_starting_num + fragments_in_submessage - 1;
// Validate fragment indexes
if (last_fragment_index > fragment_count_)
{
return false;
}
// validate lengths
if (last_fragment_index < fragment_count_)
{
if (incoming_data.length < incoming_length)
{
return false;
}
}
else
{
incoming_length = serializedPayload.length - original_offset;
}
if (original_offset + incoming_length > serializedPayload.length)
{
return false;
}
if (received_fragments(fragment_starting_num - 1, fragments_in_submessage))
{
memcpy(
&serializedPayload.data[original_offset],
incoming_data.data, incoming_length);
}
return is_fully_assembled();
}
IPayloadPool const* payload_owner() const
{
return payload_owner_;
}
IPayloadPool* payload_owner()
{
return payload_owner_;
}
void payload_owner(
IPayloadPool* owner)
{
payload_owner_ = owner;
}
private:
// Fragment size
uint16_t fragment_size_ = 0;
// Number of fragments
uint32_t fragment_count_ = 0;
// First fragment in missing list
uint32_t first_missing_fragment_ = 0;
// Pool that created the payload of this cache change
IPayloadPool* payload_owner_ = nullptr;
uint32_t get_next_missing_fragment(
uint32_t fragment_index)
{
uint32_t* ptr = next_fragment_pointer(fragment_index);
return *ptr;
}
void set_next_missing_fragment(
uint32_t fragment_index,
uint32_t next_fragment_index)
{
uint32_t* ptr = next_fragment_pointer(fragment_index);
*ptr = next_fragment_index;
}
uint32_t* next_fragment_pointer(
uint32_t fragment_index)
{
size_t offset = fragment_size_;
offset *= fragment_index;
offset = (offset + 3u) & ~3u;
return reinterpret_cast<uint32_t*>(&serializedPayload.data[offset]);
}
bool received_fragments(
uint32_t initial_fragment,
uint32_t num_of_fragments)
{
bool at_least_one_changed = false;
if ((fragment_size_ > 0) && (initial_fragment < fragment_count_))
{
uint32_t last_fragment = initial_fragment + num_of_fragments;
if (last_fragment > fragment_count_)
{
last_fragment = fragment_count_;
}
if (initial_fragment <= first_missing_fragment_)
{
// Perform first = *first until first >= last_received
while (first_missing_fragment_ < last_fragment)
{
first_missing_fragment_ = get_next_missing_fragment(first_missing_fragment_);
at_least_one_changed = true;
}
}
else
{
// Find prev in missing list
uint32_t current_frag = first_missing_fragment_;
while (current_frag < initial_fragment)
{
uint32_t next_frag = get_next_missing_fragment(current_frag);
if (next_frag >= initial_fragment)
{
// This is the fragment previous to initial_fragment.
// Find future value for next by repeating next = *next until next >= last_fragment.
uint32_t next_missing_fragment = next_frag;
while (next_missing_fragment < last_fragment)
{
next_missing_fragment = get_next_missing_fragment(next_missing_fragment);
at_least_one_changed = true;
}
// Update next and finish loop
if (at_least_one_changed)
{
set_next_missing_fragment(current_frag, next_missing_fragment);
}
break;
}
current_frag = next_frag;
}
}
}
return at_least_one_changed;
}
};
} // namespace rtps
} // namespace fastrtps
} // namespace eprosima
#endif /* _FASTDDS_RTPS_CACHECHANGE_H_ */