Program Listing for File ReaderProxy.h
↰ Return to documentation for file (/tmp/ws/src/fastrtps/include/fastdds/rtps/writer/ReaderProxy.h
)
// Copyright 2016-2019 Proyectos y Sistemas de Mantenimiento SL (eProsima).
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.
#ifndef _FASTDDS_RTPS_WRITER_READERPROXY_H_
#define _FASTDDS_RTPS_WRITER_READERPROXY_H_
#ifndef DOXYGEN_SHOULD_SKIP_THIS_PUBLIC
#include <fastdds/rtps/attributes/WriterAttributes.h>
#include <fastdds/rtps/attributes/RTPSParticipantAllocationAttributes.hpp>
#include <fastdds/rtps/builtin/data/ReaderProxyData.h>
#include <fastdds/rtps/common/Types.h>
#include <fastdds/rtps/common/Locator.h>
#include <fastdds/rtps/common/SequenceNumber.h>
#include <fastdds/rtps/common/CacheChange.h>
#include <fastdds/rtps/common/FragmentNumber.h>
#include <fastdds/rtps/writer/ChangeForReader.h>
#include <fastdds/rtps/writer/ReaderLocator.h>
#include <fastrtps/utils/collections/ResourceLimitedVector.hpp>
#include <algorithm>
#include <mutex>
#include <set>
#include <atomic>
namespace eprosima {
namespace fastrtps {
namespace rtps {
class StatefulWriter;
class TimedEvent;
class RTPSReader;
class IDataSharingNotifier;
class RTPSGapBuilder;
class ReaderProxy
{
public:
~ReaderProxy();
ReaderProxy(
const WriterTimes& times,
const RemoteLocatorsAllocationAttributes& loc_alloc,
StatefulWriter* writer);
void start(
const ReaderProxyData& reader_attributes,
bool is_datasharing = false);
bool update(
const ReaderProxyData& reader_attributes);
void stop();
void add_change(
const ChangeForReader_t& change,
bool is_relevant,
bool restart_nack_supression);
void add_change(
const ChangeForReader_t& change,
bool is_relevant,
bool restart_nack_supression,
const std::chrono::time_point<std::chrono::steady_clock>& max_blocking_time);
bool has_changes() const;
bool change_is_acked(
const SequenceNumber_t& seq_num) const;
bool change_is_unsent(
const SequenceNumber_t& seq_num,
FragmentNumber_t& next_unsent_frag,
SequenceNumber_t& gap_seq,
const SequenceNumber_t& min_seq,
bool& need_reactivate_periodic_heartbeat) const;
void acked_changes_set(
const SequenceNumber_t& seq_num);
bool requested_changes_set(
const SequenceNumberSet_t& seq_num_set,
RTPSGapBuilder& gap_builder,
const SequenceNumber_t& min_seq_in_history);
bool process_initial_acknack(
const std::function<void(ChangeForReader_t& change)>& func);
void from_unsent_to_status(
const SequenceNumber_t& seq_num,
ChangeForReaderStatus_t status,
bool restart_nack_supression,
bool delivered = true);
bool mark_fragment_as_sent_for_change(
const SequenceNumber_t& seq_num,
FragmentNumber_t frag_num,
bool& was_last_fragment);
bool perform_nack_supression();
uint32_t perform_acknack_response(
const std::function<void(ChangeForReader_t& change)>& func);
void change_has_been_removed(
const SequenceNumber_t& seq_num);
bool has_unacknowledged(
const SequenceNumber_t& first_seq_in_history) const;
inline const GUID_t& guid() const
{
return locator_info_.remote_guid();
}
inline DurabilityKind_t durability_kind() const
{
return durability_kind_;
}
inline bool expects_inline_qos() const
{
return expects_inline_qos_;
}
inline bool is_reliable() const
{
return is_reliable_;
}
inline bool disable_positive_acks() const
{
return disable_positive_acks_;
}
inline bool is_remote_and_reliable() const
{
return !locator_info_.is_local_reader() && !locator_info_.is_datasharing_reader() && is_reliable_;
}
inline bool is_local_reader()
{
return locator_info_.is_local_reader();
}
inline RTPSReader* local_reader()
{
return locator_info_.local_reader();
}
bool check_and_set_acknack_count(
uint32_t acknack_count)
{
if (last_acknack_count_ < acknack_count)
{
last_acknack_count_ = acknack_count;
return true;
}
return false;
}
bool process_nack_frag(
const GUID_t& reader_guid,
uint32_t nack_count,
const SequenceNumber_t& seq_num,
const FragmentNumberSet_t& fragments_state);
bool rtps_is_relevant(
CacheChange_t* change) const;
SequenceNumber_t changes_low_mark() const
{
return changes_low_mark_;
}
void update_nack_supression_interval(
const Duration_t& interval);
LocatorSelectorEntry* general_locator_selector_entry()
{
return locator_info_.general_locator_selector_entry();
}
LocatorSelectorEntry* async_locator_selector_entry()
{
return locator_info_.async_locator_selector_entry();
}
RTPSMessageSenderInterface* message_sender()
{
return &locator_info_;
}
bool is_datasharing_reader() const
{
return locator_info_.is_datasharing_reader();
}
IDataSharingNotifier* datasharing_notifier()
{
return locator_info_.datasharing_notifier();
}
const IDataSharingNotifier* datasharing_notifier() const
{
return locator_info_.datasharing_notifier();
}
void datasharing_notify()
{
locator_info_.datasharing_notify();
}
size_t locators_size() const
{
return locator_info_.locators_size();
}
bool active() const
{
return active_;
}
void active(
bool active)
{
active_ = active;
}
private:
bool is_active_;
ReaderLocator locator_info_;
DurabilityKind_t durability_kind_;
bool expects_inline_qos_;
bool is_reliable_;
bool disable_positive_acks_;
StatefulWriter* writer_;
ResourceLimitedVector<ChangeForReader_t, std::true_type> changes_for_reader_;
TimedEvent* nack_supression_event_;
TimedEvent* initial_heartbeat_event_;
std::atomic_bool timers_enabled_;
uint32_t last_acknack_count_;
uint32_t last_nackfrag_count_;
SequenceNumber_t changes_low_mark_;
bool active_ = false;
using ChangeIterator = ResourceLimitedVector<ChangeForReader_t, std::true_type>::iterator;
using ChangeConstIterator = ResourceLimitedVector<ChangeForReader_t, std::true_type>::const_iterator;
void disable_timers();
/*
* Converts all changes with a given status to a different status.
* @param previous Status to change.
* @param next Status to adopt.
* @param func Function executed for each change which changes its status.
* @return the number of changes that have been modified.
*/
uint32_t convert_status_on_all_changes(
ChangeForReaderStatus_t previous,
ChangeForReaderStatus_t next,
const std::function<void(ChangeForReader_t& change)>& func = {});
bool requested_fragment_set(
const SequenceNumber_t& seq_num,
const FragmentNumberSet_t& frag_set);
void add_change(
const ChangeForReader_t& change,
bool is_relevant);
ChangeIterator find_change(
const SequenceNumber_t& seq_num,
bool exact);
ChangeConstIterator find_change(
const SequenceNumber_t& seq_num) const;
};
} /* namespace rtps */
} /* namespace fastrtps */
} /* namespace eprosima */
#endif // ifndef DOXYGEN_SHOULD_SKIP_THIS_PUBLIC
#endif /* _FASTDDS_RTPS_WRITER_READERPROXY_H_ */