Program Listing for File ChangeForReader.h
↰ Return to documentation for file (/tmp/ws/src/fastrtps/include/fastdds/rtps/writer/ChangeForReader.h
)
// Copyright 2016 Proyectos y Sistemas de Mantenimiento SL (eProsima).
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef _FASTDDS_RTPS_CHANGEFORREADER_H_
#define _FASTDDS_RTPS_CHANGEFORREADER_H_
#include <fastdds/rtps/common/CacheChange.h>
#include <fastdds/rtps/common/FragmentNumber.h>
#include <fastdds/rtps/common/SequenceNumber.h>
#include <cassert>
namespace eprosima {
namespace fastrtps {
namespace rtps {
#ifndef DOXYGEN_SHOULD_SKIP_THIS_PUBLIC
enum ChangeForReaderStatus_t
{
UNSENT = 0,
REQUESTED = 1,
UNACKNOWLEDGED = 2,
ACKNOWLEDGED = 3,
UNDERWAY = 4
};
class ChangeForReader_t
{
friend struct ChangeForReaderCmp;
public:
explicit ChangeForReader_t(
CacheChange_t* change)
: status_(UNSENT)
, seq_num_(change->sequenceNumber)
, change_(change)
{
if (change->getFragmentSize() != 0)
{
unsent_fragments_.base(1u);
unsent_fragments_.add_range(1u, change->getFragmentCount() + 1u);
}
}
CacheChange_t* getChange() const
{
return change_;
}
void setStatus(
const ChangeForReaderStatus_t status)
{
status_ = status;
}
ChangeForReaderStatus_t getStatus() const
{
return status_;
}
const SequenceNumber_t getSequenceNumber() const
{
return seq_num_;
}
FragmentNumber_t get_next_unsent_fragment() const
{
if (unsent_fragments_.empty())
{
return change_->getFragmentCount() + 1;
}
return unsent_fragments_.min();
}
FragmentNumberSet_t getUnsentFragments() const
{
return unsent_fragments_;
}
void markAllFragmentsAsUnsent()
{
assert(nullptr != change_);
if (change_->getFragmentSize() != 0)
{
unsent_fragments_.base(1u);
unsent_fragments_.add_range(1u, change_->getFragmentCount() + 1u);
}
}
void markFragmentsAsSent(
const FragmentNumber_t& sentFragment)
{
unsent_fragments_.remove(sentFragment);
// We only use the running window mechanism during the first stage, until all fragments have been delivered
// once, and we consider the whole change as delivered.
if (!delivered_ && !unsent_fragments_.empty() && (unsent_fragments_.max() < change_->getFragmentCount()))
{
FragmentNumber_t base = unsent_fragments_.base();
FragmentNumber_t max = unsent_fragments_.max();
assert(!unsent_fragments_.is_set(base));
// Update base to first bit set
base = unsent_fragments_.min();
unsent_fragments_.base_update(base);
// Add all possible fragments
unsent_fragments_.add_range(max + 1u, change_->getFragmentCount() + 1u);
}
}
void markFragmentsAsUnsent(
const FragmentNumberSet_t& unsentFragments)
{
// Ignore NACK_FRAG messages during the first stage, until all fragments have been delivered once, and we
// consider the whole change as delivered.
if (delivered_)
{
if (unsent_fragments_.empty())
{
// Current window is empty, so we can set it to the received one.
unsent_fragments_ = unsentFragments;
}
else
{
// Update window to send the lowest possible requested fragments first.
FragmentNumber_t other_base = unsentFragments.base();
if (other_base < unsent_fragments_.base())
{
unsent_fragments_.base_update(other_base);
}
unsentFragments.for_each(
[this](
FragmentNumber_t element)
{
unsent_fragments_.add(element);
});
}
}
}
bool has_been_delivered() const
{
return delivered_;
}
void set_delivered()
{
delivered_ = true;
}
private:
ChangeForReaderStatus_t status_;
SequenceNumber_t seq_num_;
CacheChange_t* change_;
FragmentNumberSet_t unsent_fragments_;
bool delivered_ = false;
};
struct ChangeForReaderCmp
{
bool operator ()(
const ChangeForReader_t& a,
const ChangeForReader_t& b) const
{
return a.seq_num_ < b.seq_num_;
}
};
#endif // ifndef DOXYGEN_SHOULD_SKIP_THIS_PUBLIC
} // namespace rtps
} // namespace fastrtps
} // namespace eprosima
#endif /* _FASTDDS_RTPS_CHANGEFORREADER_H_ */