Program Listing for File StatelessReader.h

Return to documentation for file (/tmp/ws/src/fastrtps/include/fastdds/rtps/reader/StatelessReader.h)

// Copyright 2016 Proyectos y Sistemas de Mantenimiento SL (eProsima).
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
//     http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

#ifndef _FASTDDS_RTPS_READER_STATELESSREADER_H_
#define _FASTDDS_RTPS_READER_STATELESSREADER_H_

#ifndef DOXYGEN_SHOULD_SKIP_THIS_PUBLIC

#include <fastdds/rtps/reader/RTPSReader.h>
#include <fastrtps/utils/collections/ResourceLimitedVector.hpp>

#include <mutex>
#include <map>

namespace eprosima {
namespace fastrtps {
namespace rtps {

class StatelessReader : public RTPSReader
{
    friend class RTPSParticipantImpl;

public:

    virtual ~StatelessReader();

protected:

    StatelessReader(
            RTPSParticipantImpl* pimpl,
            const GUID_t& guid,
            const ReaderAttributes& att,
            ReaderHistory* hist,
            ReaderListener* listen = nullptr);

    StatelessReader(
            RTPSParticipantImpl* pimpl,
            const GUID_t& guid,
            const ReaderAttributes& att,
            const std::shared_ptr<IPayloadPool>& payload_pool,
            ReaderHistory* hist,
            ReaderListener* listen = nullptr);

    StatelessReader(
            RTPSParticipantImpl* pimpl,
            const GUID_t& guid,
            const ReaderAttributes& att,
            const std::shared_ptr<IPayloadPool>& payload_pool,
            const std::shared_ptr<IChangePool>& change_pool,
            ReaderHistory* hist,
            ReaderListener* listen = nullptr);

public:

    bool matched_writer_add(
            const WriterProxyData& wdata) override;

    bool matched_writer_remove(
            const GUID_t& writer_guid,
            bool removed_by_lease = false) override;

    bool matched_writer_is_matched(
            const GUID_t& writer_guid) override;

    bool change_removed_by_history(
            CacheChange_t* change,
            WriterProxy* prox = nullptr) override;

    bool processDataMsg(
            CacheChange_t* change) override;

    bool processDataFragMsg(
            CacheChange_t* change,
            uint32_t sampleSize,
            uint32_t fragmentStartingNum,
            uint16_t fragmentsInSubmessage) override;

    bool processHeartbeatMsg(
            const GUID_t& writerGUID,
            uint32_t hbCount,
            const SequenceNumber_t& firstSN,
            const SequenceNumber_t& lastSN,
            bool finalFlag,
            bool livelinessFlag) override;

    bool processGapMsg(
            const GUID_t& writerGUID,
            const SequenceNumber_t& gapStart,
            const SequenceNumberSet_t& gapList) override;

    bool change_received(
            CacheChange_t* a_change);

    bool nextUnreadCache(
            CacheChange_t** change,
            WriterProxy** wpout = nullptr) override;

    bool nextUntakenCache(
            CacheChange_t** change,
            WriterProxy** wpout = nullptr) override;

    inline size_t getMatchedWritersSize() const
    {
        return matched_writers_.size();
    }

    bool isInCleanState() override
    {
        return true;
    }

    inline RTPSParticipantImpl* getRTPSParticipant() const
    {
        return mp_RTPSParticipant;
    }

    void assert_writer_liveliness(
            const GUID_t& guid) override;

    bool begin_sample_access_nts(
            CacheChange_t* change,
            WriterProxy*& wp,
            bool& is_future_change) override;

    void end_sample_access_nts(
            CacheChange_t* change,
            WriterProxy*& wp,
            bool mark_as_read) override;

    void change_read_by_user(
            CacheChange_t* change,
            WriterProxy* writer,
            bool mark_as_read = true) override;

private:

    struct RemoteWriterInfo_t
    {
        GUID_t guid;
        GUID_t persistence_guid;
        bool has_manual_topic_liveliness = false;
        CacheChange_t* fragmented_change = nullptr;
        bool is_datasharing = false;
        uint32_t ownership_strength;
    };

    bool acceptMsgFrom(
            const GUID_t& entityId,
            ChangeKind_t change_kind);

    bool thereIsUpperRecordOf(
            const GUID_t& guid,
            const SequenceNumber_t& seq);

    bool writer_has_manual_liveliness(
            const GUID_t& guid);

    void remove_changes_from(
            const GUID_t& writerGUID,
            bool is_payload_pool_lost = false);


    ResourceLimitedVector<RemoteWriterInfo_t> matched_writers_;
};

} /* namespace rtps */
} /* namespace fastrtps */
} /* namespace eprosima */

#endif // ifndef DOXYGEN_SHOULD_SKIP_THIS_PUBLIC

#endif /* _FASTDDS_RTPS_READER_STATELESSREADER_H_ */