Program Listing for File ReaderProxy.h

Return to documentation for file (/tmp/ws/src/fastrtps/include/fastdds/rtps/writer/ReaderProxy.h)

// Copyright 2016-2019 Proyectos y Sistemas de Mantenimiento SL (eProsima).
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef _FASTDDS_RTPS_WRITER_READERPROXY_H_
#define _FASTDDS_RTPS_WRITER_READERPROXY_H_

#ifndef DOXYGEN_SHOULD_SKIP_THIS_PUBLIC

#include <fastdds/rtps/attributes/WriterAttributes.h>
#include <fastdds/rtps/attributes/RTPSParticipantAllocationAttributes.hpp>

#include <fastdds/rtps/builtin/data/ReaderProxyData.h>

#include <fastdds/rtps/common/Types.h>
#include <fastdds/rtps/common/Locator.h>
#include <fastdds/rtps/common/SequenceNumber.h>
#include <fastdds/rtps/common/CacheChange.h>
#include <fastdds/rtps/common/FragmentNumber.h>

#include <fastdds/rtps/writer/ChangeForReader.h>
#include <fastdds/rtps/writer/ReaderLocator.h>

#include <fastrtps/utils/collections/ResourceLimitedVector.hpp>

#include <algorithm>
#include <mutex>
#include <set>
#include <atomic>

namespace eprosima {
namespace fastrtps {
namespace rtps {

class StatefulWriter;
class TimedEvent;
class RTPSReader;
class IDataSharingNotifier;
class RTPSGapBuilder;

class ReaderProxy
{
public:

    ~ReaderProxy();

    ReaderProxy(
            const WriterTimes& times,
            const RemoteLocatorsAllocationAttributes& loc_alloc,
            StatefulWriter* writer);

    void start(
            const ReaderProxyData& reader_attributes,
            bool is_datasharing = false);

    bool update(
            const ReaderProxyData& reader_attributes);

    void stop();

    void add_change(
            const ChangeForReader_t& change,
            bool is_relevant,
            bool restart_nack_supression);

    void add_change(
            const ChangeForReader_t& change,
            bool is_relevant,
            bool restart_nack_supression,
            const std::chrono::time_point<std::chrono::steady_clock>& max_blocking_time);

    bool has_changes() const;

    bool change_is_acked(
            const SequenceNumber_t& seq_num) const;

    bool change_is_unsent(
            const SequenceNumber_t& seq_num,
            FragmentNumber_t& next_unsent_frag,
            SequenceNumber_t& gap_seq,
            const SequenceNumber_t& min_seq,
            bool& need_reactivate_periodic_heartbeat) const;

    void acked_changes_set(
            const SequenceNumber_t& seq_num);

    bool requested_changes_set(
            const SequenceNumberSet_t& seq_num_set,
            RTPSGapBuilder& gap_builder,
            const SequenceNumber_t& min_seq_in_history);

    bool process_initial_acknack(
            const std::function<void(ChangeForReader_t& change)>& func);

    void from_unsent_to_status(
            const SequenceNumber_t& seq_num,
            ChangeForReaderStatus_t status,
            bool restart_nack_supression,
            bool delivered = true);

    bool mark_fragment_as_sent_for_change(
            const SequenceNumber_t& seq_num,
            FragmentNumber_t frag_num,
            bool& was_last_fragment);

    bool perform_nack_supression();

    uint32_t perform_acknack_response(
            const std::function<void(ChangeForReader_t& change)>& func);

    void change_has_been_removed(
            const SequenceNumber_t& seq_num);

    bool has_unacknowledged(
            const SequenceNumber_t& first_seq_in_history) const;

    inline const GUID_t& guid() const
    {
        return locator_info_.remote_guid();
    }

    inline DurabilityKind_t durability_kind() const
    {
        return durability_kind_;
    }

    inline bool expects_inline_qos() const
    {
        return expects_inline_qos_;
    }

    inline bool is_reliable() const
    {
        return is_reliable_;
    }

    inline bool disable_positive_acks() const
    {
        return disable_positive_acks_;
    }

    inline bool is_remote_and_reliable() const
    {
        return !locator_info_.is_local_reader() && !locator_info_.is_datasharing_reader() && is_reliable_;
    }

    inline bool is_local_reader()
    {
        return locator_info_.is_local_reader();
    }

    inline RTPSReader* local_reader()
    {
        return locator_info_.local_reader();
    }

    bool check_and_set_acknack_count(
            uint32_t acknack_count)
    {
        if (last_acknack_count_ < acknack_count)
        {
            last_acknack_count_ = acknack_count;
            return true;
        }

        return false;
    }

    bool process_nack_frag(
            const GUID_t& reader_guid,
            uint32_t nack_count,
            const SequenceNumber_t& seq_num,
            const FragmentNumberSet_t& fragments_state);

    bool rtps_is_relevant(
            CacheChange_t* change) const;

    SequenceNumber_t changes_low_mark() const
    {
        return changes_low_mark_;
    }

    void update_nack_supression_interval(
            const Duration_t& interval);

    LocatorSelectorEntry* general_locator_selector_entry()
    {
        return locator_info_.general_locator_selector_entry();
    }

    LocatorSelectorEntry* async_locator_selector_entry()
    {
        return locator_info_.async_locator_selector_entry();
    }

    RTPSMessageSenderInterface* message_sender()
    {
        return &locator_info_;
    }

    bool is_datasharing_reader() const
    {
        return locator_info_.is_datasharing_reader();
    }

    IDataSharingNotifier* datasharing_notifier()
    {
        return locator_info_.datasharing_notifier();
    }

    const IDataSharingNotifier* datasharing_notifier() const
    {
        return locator_info_.datasharing_notifier();
    }

    void datasharing_notify()
    {
        locator_info_.datasharing_notify();
    }

    size_t locators_size() const
    {
        return locator_info_.locators_size();
    }

    bool active() const
    {
        return active_;
    }

    void active(
            bool active)
    {
        active_ = active;
    }

private:

    bool is_active_;
    ReaderLocator locator_info_;
    DurabilityKind_t durability_kind_;
    bool expects_inline_qos_;
    bool is_reliable_;
    bool disable_positive_acks_;
    StatefulWriter* writer_;
    ResourceLimitedVector<ChangeForReader_t, std::true_type> changes_for_reader_;
    TimedEvent* nack_supression_event_;
    TimedEvent* initial_heartbeat_event_;
    std::atomic_bool timers_enabled_;
    uint32_t last_acknack_count_;
    uint32_t last_nackfrag_count_;

    SequenceNumber_t changes_low_mark_;

    bool active_ = false;

    using ChangeIterator = ResourceLimitedVector<ChangeForReader_t, std::true_type>::iterator;
    using ChangeConstIterator = ResourceLimitedVector<ChangeForReader_t, std::true_type>::const_iterator;

    void disable_timers();

    /*
     * Converts all changes with a given status to a different status.
     * @param previous Status to change.
     * @param next Status to adopt.
     * @param func Function executed for each change which changes its status.
     * @return the number of changes that have been modified.
     */
    uint32_t convert_status_on_all_changes(
            ChangeForReaderStatus_t previous,
            ChangeForReaderStatus_t next,
            const std::function<void(ChangeForReader_t& change)>& func = {});

    bool requested_fragment_set(
            const SequenceNumber_t& seq_num,
            const FragmentNumberSet_t& frag_set);

    void add_change(
            const ChangeForReader_t& change,
            bool is_relevant);

    ChangeIterator find_change(
            const SequenceNumber_t& seq_num,
            bool exact);

    ChangeConstIterator find_change(
            const SequenceNumber_t& seq_num) const;
};

} /* namespace rtps */
} /* namespace fastrtps */
} /* namespace eprosima */

#endif // ifndef DOXYGEN_SHOULD_SKIP_THIS_PUBLIC
#endif /* _FASTDDS_RTPS_WRITER_READERPROXY_H_ */