Program Listing for File StatefulWriter.h

Return to documentation for file (/tmp/ws/src/fastrtps/include/fastdds/rtps/writer/StatefulWriter.h)

// Copyright 2016 Proyectos y Sistemas de Mantenimiento SL (eProsima).
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef _FASTDDS_RTPS_STATEFULWRITER_H_
#define _FASTDDS_RTPS_STATEFULWRITER_H_

#ifndef DOXYGEN_SHOULD_SKIP_THIS_PUBLIC

#include <fastdds/rtps/writer/RTPSWriter.h>
#include <fastdds/rtps/interfaces/IReaderDataFilter.hpp>
#include <fastdds/rtps/history/IChangePool.h>
#include <fastdds/rtps/history/IPayloadPool.h>
#include <fastrtps/utils/collections/ResourceLimitedVector.hpp>
#include <condition_variable>
#include <mutex>

namespace eprosima {
namespace fastrtps {
namespace rtps {

class ReaderProxy;
class TimedEvent;

class StatefulWriter : public RTPSWriter
{
    friend class RTPSParticipantImpl;
    friend class ReaderProxy;

public:

    virtual ~StatefulWriter();

protected:

    StatefulWriter(
            RTPSParticipantImpl* impl,
            const GUID_t& guid,
            const WriterAttributes& att,
            fastdds::rtps::FlowController* flow_controller,
            WriterHistory* hist,
            WriterListener* listen = nullptr);

    StatefulWriter(
            RTPSParticipantImpl* impl,
            const GUID_t& guid,
            const WriterAttributes& att,
            const std::shared_ptr<IPayloadPool>& payload_pool,
            fastdds::rtps::FlowController* flow_controller,
            WriterHistory* hist,
            WriterListener* listen = nullptr);

    StatefulWriter(
            RTPSParticipantImpl* impl,
            const GUID_t& guid,
            const WriterAttributes& att,
            const std::shared_ptr<IPayloadPool>& payload_pool,
            const std::shared_ptr<IChangePool>& change_pool,
            fastdds::rtps::FlowController* flow_controller,
            WriterHistory* hist,
            WriterListener* listen = nullptr);

    void rebuild_status_after_load();

    virtual void print_inconsistent_acknack(
            const GUID_t& writer_guid,
            const GUID_t& reader_guid,
            const SequenceNumber_t& min_requested_sequence_number,
            const SequenceNumber_t& max_requested_sequence_number,
            const SequenceNumber_t& next_sequence_number);

private:

    void init(
            RTPSParticipantImpl* pimpl,
            const WriterAttributes& att);

    TimedEvent* periodic_hb_event_;

    TimedEvent* nack_response_event_;

    TimedEvent* ack_event_;

    Count_t m_heartbeatCount;
    WriterTimes m_times;

    ResourceLimitedVector<ReaderProxy*> matched_remote_readers_;
    ResourceLimitedVector<ReaderProxy*> matched_readers_pool_;

    using ReaderProxyIterator = ResourceLimitedVector<ReaderProxy*>::iterator;
    using ReaderProxyConstIterator = ResourceLimitedVector<ReaderProxy*>::const_iterator;

    SequenceNumber_t next_all_acked_notify_sequence_;
    SequenceNumber_t min_readers_low_mark_;

    // TODO Join this mutex when main mutex would not be recursive.
    std::mutex all_acked_mutex_;
    std::condition_variable all_acked_cond_;
    // TODO Also remove when main mutex not recursive.
    bool all_acked_;
    std::condition_variable_any may_remove_change_cond_;
    unsigned int may_remove_change_;

public:

    void unsent_change_added_to_history(
            CacheChange_t* p,
            const std::chrono::time_point<std::chrono::steady_clock>& max_blocking_time) override;

    bool change_removed_by_history(
            CacheChange_t* a_change) override;

    bool intraprocess_delivery(
            CacheChange_t* change,
            ReaderProxy* reader_proxy);

    bool intraprocess_gap(
            ReaderProxy* reader_proxy,
            const SequenceNumber_t& seq_num)
    {
        SequenceNumber_t last_seq = seq_num + 1;
        return intraprocess_gap(reader_proxy, seq_num, last_seq);
    }

    bool intraprocess_gap(
            ReaderProxy* reader_proxy,
            const SequenceNumber_t& first_seq,
            const SequenceNumber_t& last_seq);

    bool intraprocess_heartbeat(
            ReaderProxy* reader_proxy,
            bool liveliness = false);

    inline void incrementHBCount()
    {
        on_heartbeat(++m_heartbeatCount);
    }

    bool matched_reader_add(
            const ReaderProxyData& data) override;

    bool matched_reader_remove(
            const GUID_t& reader_guid) override;

    bool matched_reader_is_matched(
            const GUID_t& reader_guid) override;

    bool is_acked_by_all(
            const CacheChange_t* a_change) const override;

    template <typename Function>
    Function for_each_reader_proxy(
            Function f) const
    {
        // we cannot directly pass iterators neither const_iterators to matched_readers_ because then the functor would
        // be able to modify ReaderProxy elements
        for ( const ReaderProxy* rp : matched_local_readers_ )
        {
            f(rp);
        }
        for ( const ReaderProxy* rp : matched_datasharing_readers_ )
        {
            f(rp);
        }
        for ( const ReaderProxy* rp : matched_remote_readers_ )
        {
            f(rp);
        }

        return f;
    }

    bool wait_for_all_acked(
            const Duration_t& max_wait) override;

    bool all_readers_updated();

    bool try_remove_change(
            const std::chrono::steady_clock::time_point& max_blocking_time_point,
            std::unique_lock<RecursiveTimedMutex>& lock) override;

    bool wait_for_acknowledgement(
            const SequenceNumber_t& seq,
            const std::chrono::steady_clock::time_point& max_blocking_time_point,
            std::unique_lock<RecursiveTimedMutex>& lock) override;

    void updateAttributes(
            const WriterAttributes& att) override;

    bool matched_reader_lookup(
            GUID_t& readerGuid,
            ReaderProxy** RP);

    inline Count_t getHeartbeatCount() const
    {
        return this->m_heartbeatCount;
    }

    inline RTPSParticipantImpl* getRTPSParticipant() const
    {
        return mp_RTPSParticipant;
    }

    inline size_t getMatchedReadersSize() const
    {
        std::lock_guard<RecursiveTimedMutex> guard(mp_mutex);
        return matched_remote_readers_.size()
               + matched_local_readers_.size()
               + matched_datasharing_readers_.size();
    }

    inline bool get_disable_positive_acks() const
    {
        return disable_positive_acks_;
    }

    void updateTimes(
            const WriterTimes& times);

    SequenceNumber_t next_sequence_number() const;

    bool send_periodic_heartbeat(
            bool final = false,
            bool liveliness = false);

    void send_heartbeat_to_nts(
            ReaderProxy& remoteReaderProxy,
            bool liveliness = false,
            bool force = false);

    void perform_nack_response();

    void perform_nack_supression(
            const GUID_t& reader_guid);

    bool process_acknack(
            const GUID_t& writer_guid,
            const GUID_t& reader_guid,
            uint32_t ack_count,
            const SequenceNumberSet_t& sn_set,
            bool final_flag,
            bool& result) override;

    virtual bool process_nack_frag(
            const GUID_t& writer_guid,
            const GUID_t& reader_guid,
            uint32_t ack_count,
            const SequenceNumber_t& seq_num,
            const FragmentNumberSet_t fragments_state,
            bool& result) override;

    void reader_data_filter(
            fastdds::rtps::IReaderDataFilter* filter) final;

    const fastdds::rtps::IReaderDataFilter* reader_data_filter() const final;

    DeliveryRetCode deliver_sample_nts(
            CacheChange_t* cache_change,
            RTPSMessageGroup& group,
            LocatorSelectorSender& locator_selector,
            const std::chrono::time_point<std::chrono::steady_clock>& max_blocking_time) override;

    LocatorSelectorSender& get_general_locator_selector() override
    {
        return locator_selector_general_;
    }

    LocatorSelectorSender& get_async_locator_selector() override
    {
        return locator_selector_async_;
    }

private:

    bool is_acked_by_all(
            const SequenceNumber_t seq) const;

    void update_reader_info(
            LocatorSelectorSender& locator_selector,
            bool create_sender_resources);

    void select_all_readers_nts(
            RTPSMessageGroup& group,
            LocatorSelectorSender& locator_selector);

    void send_heartbeat_piggyback_nts_(
            RTPSMessageGroup& message_group,
            LocatorSelectorSender& locator_selector,
            uint32_t& last_bytes_processed);

    void send_heartbeat_nts_(
            size_t number_of_readers,
            RTPSMessageGroup& message_group,
            bool final,
            bool liveliness = false);

    void check_acked_status();

    bool ack_timer_expired();

    void send_heartbeat_to_all_readers();

    void deliver_sample_to_intraprocesses(
            CacheChange_t* change);

    void deliver_sample_to_datasharing(
            CacheChange_t* change);

    DeliveryRetCode deliver_sample_to_network(
            CacheChange_t* change,
            RTPSMessageGroup& group,
            LocatorSelectorSender& locator_selector,
            const std::chrono::time_point<std::chrono::steady_clock>& max_blocking_time);

    void prepare_datasharing_delivery(
            CacheChange_t* change);

    void add_gaps_for_holes_in_history_(
            RTPSMessageGroup& group);

    bool disable_heartbeat_piggyback_;
    bool disable_positive_acks_;
    std::chrono::duration<double, std::ratio<1, 1000000>> keep_duration_us_;
    SequenceNumber_t last_sequence_number_;
    SequenceNumber_t biggest_removed_sequence_number_;

    const uint32_t sendBufferSize_;

    int32_t currentUsageSendBufferSize_;

    bool there_are_remote_readers_ = false;
    bool there_are_local_readers_ = false;

    StatefulWriter& operator =(
            const StatefulWriter&) = delete;

    fastdds::rtps::IReaderDataFilter* reader_data_filter_ = nullptr;
    ResourceLimitedVector<ReaderProxy*> matched_local_readers_;
    ResourceLimitedVector<ReaderProxy*> matched_datasharing_readers_;
    bool there_are_datasharing_readers_ = false;

    LocatorSelectorSender locator_selector_general_;

    LocatorSelectorSender locator_selector_async_;
};

} /* namespace rtps */
} /* namespace fastrtps */
} /* namespace eprosima */

#endif // ifndef DOXYGEN_SHOULD_SKIP_THIS_PUBLIC
#endif /* _FASTDDS_RTPS_STATEFULWRITER_H_ */