Program Listing for File StatefulReader.h

Return to documentation for file (/tmp/ws/src/fastrtps/include/fastdds/rtps/reader/StatefulReader.h)

// Copyright 2016 Proyectos y Sistemas de Mantenimiento SL (eProsima).
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef _FASTDDS_RTPS_READER_STATEFULREADER_H_
#define _FASTDDS_RTPS_READER_STATEFULREADER_H_

#ifndef DOXYGEN_SHOULD_SKIP_THIS_PUBLIC

#include <fastdds/rtps/reader/RTPSReader.h>
#include <fastrtps/utils/collections/ResourceLimitedVector.hpp>
#include <fastdds/rtps/common/CDRMessage_t.h>
#include <fastdds/rtps/messages/RTPSMessageGroup.h>

#include <mutex>

namespace eprosima {
namespace fastrtps {
namespace rtps {

class WriterProxy;
class RTPSMessageSenderInterface;

class StatefulReader : public RTPSReader
{
public:

    friend class RTPSParticipantImpl;

    virtual ~StatefulReader();

protected:

    StatefulReader(
            RTPSParticipantImpl* pimpl,
            const GUID_t& guid,
            const ReaderAttributes& att,
            ReaderHistory* hist,
            ReaderListener* listen = nullptr);

    StatefulReader(
            RTPSParticipantImpl* pimpl,
            const GUID_t& guid,
            const ReaderAttributes& att,
            const std::shared_ptr<IPayloadPool>& payload_pool,
            ReaderHistory* hist,
            ReaderListener* listen = nullptr);

    StatefulReader(
            RTPSParticipantImpl* pimpl,
            const GUID_t& guid,
            const ReaderAttributes& att,
            const std::shared_ptr<IPayloadPool>& payload_pool,
            const std::shared_ptr<IChangePool>& change_pool,
            ReaderHistory* hist,
            ReaderListener* listen = nullptr);

public:

    bool matched_writer_add(
            const WriterProxyData& wdata) override;

    bool matched_writer_remove(
            const GUID_t& writer_guid,
            bool removed_by_lease = false) override;

    bool matched_writer_is_matched(
            const GUID_t& writer_guid) override;

    bool matched_writer_lookup(
            const GUID_t& writerGUID,
            WriterProxy** WP);

    bool processDataMsg(
            CacheChange_t* change) override;

    bool processDataFragMsg(
            CacheChange_t* change,
            uint32_t sampleSize,
            uint32_t fragmentStartingNum,
            uint16_t fragmentsInSubmessage) override;

    bool processHeartbeatMsg(
            const GUID_t& writerGUID,
            uint32_t hbCount,
            const SequenceNumber_t& firstSN,
            const SequenceNumber_t& lastSN,
            bool finalFlag,
            bool livelinessFlag) override;

    bool processGapMsg(
            const GUID_t& writerGUID,
            const SequenceNumber_t& gapStart,
            const SequenceNumberSet_t& gapList) override;

    bool change_removed_by_history(
            CacheChange_t* change,
            WriterProxy* prox = nullptr) override;

    bool change_received(
            CacheChange_t* a_change,
            WriterProxy* prox,
            size_t unknown_missing_changes_up_to);

    inline RTPSParticipantImpl* getRTPSParticipant() const
    {
        return mp_RTPSParticipant;
    }

    ResourceEvent& getEventResource() const;

    bool nextUnreadCache(
            CacheChange_t** change,
            WriterProxy** wpout = nullptr) override;

    bool nextUntakenCache(
            CacheChange_t** change,
            WriterProxy** wpout = nullptr) override;

    bool updateTimes(
            const ReaderTimes& times);

    inline ReaderTimes& getTimes()
    {
        return times_;
    }

    inline size_t getMatchedWritersSize() const
    {
        return matched_writers_.size();
    }

    bool isInCleanState() override;

    void send_acknack(
            const WriterProxy* writer,
            const SequenceNumberSet_t& sns,
            RTPSMessageSenderInterface* sender,
            bool is_final);

    void send_acknack(
            const WriterProxy* writer,
            RTPSMessageSenderInterface* sender,
            bool heartbeat_was_final);

    bool send_sync_nts(
            CDRMessage_t* message,
            const Locators& locators_begin,
            const Locators& locators_end,
            std::chrono::steady_clock::time_point& max_blocking_time_point);

    void assert_writer_liveliness(
            const GUID_t& writer) override;

    bool begin_sample_access_nts(
            CacheChange_t* change,
            WriterProxy*& wp,
            bool& is_future_change) override;

    void end_sample_access_nts(
            CacheChange_t* change,
            WriterProxy*& wp,
            bool mark_as_read) override;

    void change_read_by_user(
            CacheChange_t* change,
            WriterProxy* writer,
            bool mark_as_read = true) override;

private:

    void init(
            RTPSParticipantImpl* pimpl,
            const ReaderAttributes& att);

    bool acceptMsgFrom(
            const GUID_t& entityGUID,
            WriterProxy** wp) const;

    bool findWriterProxy(
            const GUID_t& writerGUID,
            WriterProxy** wp) const;

    void NotifyChanges(
            WriterProxy* wp);

    void remove_changes_from(
            const GUID_t& writerGUID,
            bool is_payload_pool_lost = false);

    uint32_t acknack_count_;
    uint32_t nackfrag_count_;
    ReaderTimes times_;
    ResourceLimitedVector<WriterProxy*> matched_writers_;
    ResourceLimitedVector<WriterProxy*> matched_writers_pool_;
    ResourceLimitedContainerConfig proxy_changes_config_;
    bool disable_positive_acks_;
    bool is_alive_;
};

} /* namespace rtps */
} /* namespace fastrtps */
} /* namespace eprosima */

#endif // ifndef DOXYGEN_SHOULD_SKIP_THIS_PUBLIC

#endif // _FASTDDS_RTPS_READER_STATEFULREADER_H_